|
3.4 服务库与事件库/ K3 I% ^' Y$ D6 Q& I( Z( b' V
本节介绍服务库和事件库。
3 z, b0 A8 f, D8 f! z' d% |: l3.4.1 服务库
* [6 o3 A9 V. F- G7 V3 C/ J2 ?对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
/ ^6 d' @6 U3 P3 O+ X3 E: s❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、% {) u3 E. w. O" T
STOPPED( 已停止) 。 u& ?$ H6 N! W, }3 I+ z2 e
❑任何服务状态变化都可以触发另外一些动作。3 s1 [3 w# n: b9 c6 v! i: `
❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
' \: ^ G; {! n& c" kYARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务2 c. G9 V5 P' u- q# [
对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的5 {0 S5 d# Y) s, k& a& ?5 _
Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于3 }" P! i) F7 ^! b; m1 y* [
ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、
0 _3 Y: s/ C' d, gApplicationMasterService等。% M# T# } M J& w/ c( A$ y
图3-13 YARN中服务模型的类图
! g( X7 }6 d( j! Z0 V; Y在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服# x W, O5 o" d T* E+ P
务的统一管理。" F# G9 z8 ]0 w: k& _$ [/ L
3.4.2 事件库1 E% V6 f6 O" O1 s5 J) ~
YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将
* n% Y, f$ e4 B4 y2 n: v1 [/ ]5 _各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处
0 }7 w/ q& O G" a) E& `理模型可概括为图3-14所示。! Y J! A0 ]8 [- e# ]; K2 U! z$ D
图3-14 YARN的事件处理模型
6 K9 `% N% I0 X& t; R5 N* `整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器0 L* ^# C D: u0 x
( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其/ u6 }$ |( P( ]' b3 W" i
处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成+ U/ _9 Y4 {* W8 L( L
( 达到终止条件) 。
- j% Z5 \+ N7 P6 d; d- _; H在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、
8 L/ X o0 Z- `7 C5 [. q! eMRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
8 `& W, Z+ f0 `" E' t驱动服务的运行。6 }% y6 r2 h6 F# ` u0 f
YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
R0 y6 | M/ w; {* m- {5 A9 P要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器, S- V, @+ s7 F
EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
, X; b7 o e) W' B2 q2 F/ s- FAsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由
g( I m: _& x& f中央异步调度器统一管理和调度。4 O/ h. `, w( {* f" K/ i
服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
4 Q: ?' z' Z m- L8 I则采用事件联系起来, 系统设计简单且维护方便。
( [" L2 `# Q& n) Y, J图3-15 事件与事件处理器4 o& J. c g1 k/ g/ h y, J( z
3.4.3 YARN服务库和事件库的使用方法: g( {8 g' E* i7 t( J- Z ], W
为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce
5 O. h5 {: d a9 f ]7 u2 {ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
' B, s% {; S& X1 `, [1) 定义Task事件。
4 ?8 W, t, c* ?8 E( Vpublic class TaskEvent extends AbstractEvent<TaskEventType> {
, _' o: k. h: U9 n5 Lprivate String taskID; //Task ID
% T! f. `/ Q7 |) d# U2 P7 Apublic TaskEvent(String taskID, TaskEventType type) {$ g( ]5 T& r5 Q2 l. l) c- M
super(type);, L1 E0 @( \+ ]' g- j, e
this.taskID = taskID;) g f5 X1 ~" Z- c, f3 o, X6 ~
} p
* A9 V u' m" O! o( nublic String getTaskID() {$ n" z2 W4 _9 j: @- Y1 h
return taskID;$ |. ?( a% n. G- x) `4 E
} 其) @ D' _4 F0 S
中, Task事件类型定义如下:* K; G3 g0 h0 s5 Q
public enum TaskEventType {$ L. x( n6 K0 w9 s
T_KILL,! |! O- L4 l& W
T_SCHEDULE a) a5 E; W" q8 d+ p7 j: `
}
( p3 `8 W( Q( u6 o2) 定义Job事件。
! \0 [! [' ]) Qpublic class JobEvent extends AbstractEvent<JobEventType> {
( e! N' D& b' u: q8 P# M6 i; tprivate String jobID;7 r& q# @0 Q3 C* G! c
public JobEvent(String jobID, JobEventType type) {
! E6 Z' H# \' b+ U3 m5 Vsuper(type);3 O$ P" ]( W4 G
this.jobID = jobID;9 V9 \* z+ q2 X- o
} p
2 q1 A! o2 @. @7 [ \$ Bublic String getJobId() {
, T1 U" {( S8 freturn jobID;
; E, L, B4 b5 ^% B* q4 J}
( O; u! D0 v0 g3 Q/ u}
" h* y9 \( d5 U( f( O" w其中, Job事件类型定义如下:
9 {8 j0 I$ a8 T2 K) O; zpublic enum JobEventType {
/ b# {4 D$ b, Q! C# Q; _JOB_KILL,
& c2 s4 k) Y6 _/ ]! b4 ], cJOB_INIT,
$ Y& T; y h3 F9 W$ N9 y& PJOB_START6 A( S3 b7 u% O8 j6 C
}7 N* [+ y3 B; `) s
3) 事件调度器。
' S8 ]- v8 C) o- p3 D5 n- e接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
# ^( c8 G5 w+ X$ R, a% O, n@SuppressWarnings("unchecked")
0 M# a% j0 a7 z$ J& Y6 Z* }public class SimpleMRAppMaster extends CompositeService {
- Z& S& n9 `, E0 _ Q% w7 x& iprivate Dispatcher dispatcher; //中央异步调度器
1 J1 v2 g+ u9 h, ~& _private String jobID; V1 ~" q) u. K. w" h9 u5 f
private int taskNumber; //该作业包含的任务数目2 e5 s. l1 S/ j T8 Z' h
private String[] taskIDs; //该作业内部包含的所有任务
% C6 e1 @# O1 L$ b( B7 vpublic SimpleMRAppMaster(String name, String jobID, int taskNumber) {
7 ~6 y/ v- f o9 Q/ m4 r5 lsuper(name);4 s9 Q- P) p6 u. a
this.jobID = jobID;% m6 L4 y$ g& k( x- F
this.taskNumber = taskNumber;
0 B% z, B1 R6 FtaskIDs = new String[taskNumber];9 w3 \0 K% z: h
for(int i = 0; i < taskNumber; i++) {
" r% K) K v/ otaskIDs = new String(jobID + "_task_" + i);4 I8 g/ g% ^" O4 u& U7 [
}1 f X- ?7 _0 j4 S2 @! [' [
} p
^+ F' O, b4 ^# s% v- R Eublic void serviceInit(final Configuration conf) throws Exception {7 \: O) ]8 ^% `2 V
dispatcher = new AsyncDispatcher();//定义一个中央异步调度器" W* o7 s: P1 r- E* K8 r
//分别注册Job和Task事件调度器2 B- i3 @+ N! U) p, B1 K
dispatcher.register(JobEventType.class, new JobEventDispatcher());1 v" Y( p1 S; x5 j
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());* a" Y/ D+ v6 R, m
addService((Service) dispatcher);- Z5 R3 D( j6 ?) w) j
super.serviceInit(conf);2 i! H5 h9 J2 Y5 v& \
} p
! W# m9 O' y/ y3 A5 Hublic Dispatcher getDispatcher() {
+ s( B! d- J7 r) f: }: Z2 u9 _return dispatcher;8 C: b! C+ C# ^, n3 t
} p' [2 w" E$ I' S7 k& l
rivate class JobEventDispatcher implements EventHandler<JobEvent> {
- A, q; C, T3 b+ k@Override" m, h& X/ r+ a8 K: X9 K
public void handle(JobEvent event) {0 J8 B# w9 ~7 `( m! q& \) |: F
if(event.getType() == JobEventType.JOB_KILL) { Q6 g( l$ {+ m8 E4 s# Q+ G
System.out.println("Receive JOB_KILL event, killing all the tasks");7 O$ [! C# I. v7 G- _
for(int i = 0; i < taskNumber; i++) {/ ^6 z' X# [+ @; {
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
; I$ c& a) N8 zTaskEventType.T_KILL));
: r2 S6 [; U( J' X8 s}4 m2 c, N$ [' `/ j
} else if(event.getType() == JobEventType.JOB_INIT) {
% T3 g2 O \% j6 [; _0 W/ a% eSystem.out.println("Receive JOB_INIT event, scheduling tasks");
/ R, F! J# s' ?3 wfor(int i = 0; i < taskNumber; i++) {
3 m! m( u _: J; W# Zdispatcher.getEventHandler().handle(new TaskEvent(taskIDs,% g8 }, K! C9 `
TaskEventType.T_SCHEDULE));
6 v3 q0 r" f/ h}1 b- Y9 A d5 G) O, Y
}: {7 ^+ `7 _! c
}
1 Q1 `5 C- b5 I' @: A6 o}p7 | p- N) D. l( p! d
rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {1 U. H/ n/ f6 r0 g9 c
@Override
1 `/ M7 s+ A) @/ G& a9 e& {# q# {2 \public void handle(TaskEvent event) {! U. y) _ x% A
if(event.getType() == TaskEventType.T_KILL) {% X6 E/ j2 Y3 |1 f
System.out.println("Receive T_KILL event of task " + event.getTaskID());5 i! g. v+ D& _
} else if(event.getType() == TaskEventType.T_SCHEDULE) {
- Y& B% B$ ~7 P4 J, VSystem.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
9 |* f! K, h5 I7 b2 F N$ C' y}
" D$ e5 \* Z% Q k) q N% y}
, R" X. P' }4 M$ o1 M3 P- o}0 \. c+ j1 S' g G
}
: k0 P; E: |6 k* @% m5 b8 L4) 测试程序。7 E" z" d! W @4 z! f5 f& g0 k$ o
@SuppressWarnings("unchecked")+ E2 M( x9 K2 k2 I/ W) g, r
public class SimpleMRAppMasterTest {
% x4 g% {( d2 |6 Apublic static void main(String[] args) throws Exception {
2 H5 ~( d* f0 S) ]- DString jobID = "job_20131215_12";
; w: k' ]1 ^. i8 @" R* K6 H5 `6 `SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
* {5 w: v# @! |2 N4 x8 TYarnConfiguration conf = new YarnConfiguration(new Configuration());
3 {* h/ \( U, h5 D- y$ M0 b) lappMaster.serviceInit(conf);
) B. C- _0 {3 G9 G& |appMaster.serviceStart();
" J7 f: `, k. q8 W% X6 @/ UappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
* p; `" S, [3 z \1 x/ c7 W6 nJobEventType.JOB_KILL));
3 H2 l$ `9 ]6 \; i0 r5 h ]3 I0 j# RappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
# P0 a1 \+ @) X' @) K R$ SJobEventType.JOB_INIT));
2 C4 ~/ u9 C* ?9 t! f! g" u8 K! Q}- _, A; F& h+ |2 k! [
3.4.4 事件驱动带来的变化' \* p, V/ p; c# ?1 N
在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的' J% ^- l( U8 J, R; m
方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
4 d+ W- j* @1 g# @: w字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
( g' C( }( |+ S是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一- O5 k7 K+ S9 ^: }& |& V
个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决8 w- R% j( [! H
问题之道, 必须引入新的编程模型。7 H- i" N# ~' e; o0 Z
图3-16 基于函数调用的工作流程6 V# Q: h, o7 o+ ~' \# J
基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则
% N, C7 g* p9 R$ j" [是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关; k5 _+ v1 ^+ w/ ^. `
联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需
5 A5 {& N) p2 G( v3 W! |" K. l向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
7 v/ I) \2 A- W6 k# O6 [器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。
8 W0 I+ Y! P2 o# ?" l图3-17 基于事件驱动的工作流程" j: t( E9 k9 ^+ p& {
相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
: Y, B6 e8 @- V$ F[15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。 ' X, U3 u! ?) l, R+ @/ L
! C% p W. v D E7 V7 c: w0 T5 H+ p
|
|