|
3.5 状态机库) e7 ]2 e" L- l3 {) n9 {, x$ ~2 k( [4 R
状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中- D2 Z3 H* p! D q( K
间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
; d1 R) H: M+ ]3 o态。 当状态机转换到最终状态时, 则退出。
& ? D6 L6 j, w9 t3.5.1 YARN状态转换方式
) m: _2 T7 U7 G% D/ X5 l在YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和9 b3 ?5 A3 H$ t* m+ E
回调函数( hook) 。 YARN定义了三种状态转换方式, 具体如下:
: d0 H/ N, ^" `1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
7 p; y$ J) o! w3 p9 ~! B函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。4 x( ~3 r- E; W- A4 c9 t" [
图3-18 初始状态:最终状态:事件=1:1:15 g+ z6 A3 B4 Y# Y' W8 B% F
2) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
* ]6 B6 ^7 n4 L, y1 Z3 J% U: ^0 x函数状态转移函数Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。
2 U* U) K, Y* R: F2 e( c2 ~, v) m图3-19 初始状态:最终状态:事件=1:N :1( N* {4 T+ |- k% F+ E: d! K* ~: A
3) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1、 Event2和
# d* j0 [0 a, l: a* B( fEvent3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。
& ?) g3 [% X9 T; X图3-20 初始状态:最终状态:事件=1:1:N
8 y( h& f8 Q2 G3.5.2 状态机类& B ^7 U/ g5 Q
YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了
# @) I9 P5 g$ M( d* m& f3 ]一个状态机工厂StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调
3 @5 K" S0 }7 g; U( C3 l3 b. U. E用installTopology完成一个状态机的构建。- z" O2 X% L/ H( m4 S) X3 t* `
图3-21 状态机类图
9 P h2 Y4 s1 W% ^: s3.5.3 状态机的使用方法 E( i5 r) ^9 g5 N& R6 j4 x
本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
6 F2 Z- U7 K8 t% ^' `, @变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度8 H+ s, Q; k) j- K( E! a$ N
器, 可以嵌到3.4.3节的实例程序中运行。% U% U/ C8 ^* v. g6 J- g; n( D/ r- S
1) 定义作业类型。
! o" E' c' A6 z6 Qpublic enum JobEventType {
5 p) W% U5 Z! }9 _# C. P0 F0 \JOB_KILL,
" `# V: e' V" a- k% rJOB_INIT,4 p' |/ v+ }. ]3 ` ^3 R$ D
JOB_START,* ]" z" V0 Y9 v
JOB_SETUP_COMPLETED,
# x1 e7 V* C& U. o! D! Q& g9 k! A' ]JOB_COMPLETED
6 K; S, w- b% [}
5 q I% w3 C* J. i2) 定义作业状态机。
6 Q/ t* d! O" r' {@SuppressWarnings({ "rawtypes", "unchecked" })
) Z+ ?- Z1 O- ^7 opublic class JobStateMachine implements EventHandler<JobEvent>{" X( C& s! p. |( x9 J
private final String jobID;
6 t ^: ^3 s6 g3 E+ l# I6 Kprivate EventHandler eventHandler;* \& p4 S* k7 Q
private final Lock writeLock;
* R6 b6 m$ U8 |# }" [private final Lock readLock;- j, b) p! R# J1 Q. u
// 定义状态机
; d2 H% j* |2 P6 r6 {* `protected static final" n7 W1 Y& e4 E/ C5 A
StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>" x) e) B0 n. G- y+ v
stateMachineFactory- K/ N, O2 u$ F, x% S
= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
9 U' P* P2 k0 \6 l/ b( K6 O" l/ A5 t(JobStateInternal.NEW)/ S) W! `( l. v8 V
.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,* G$ B5 Q/ o! c3 c+ o* w
JobEventType.JOB_INIT,
, k+ w8 {7 j3 }4 i! y$ h( mnew InitTransition())
) u- C w" j+ Q7 m1 X4 d.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
/ b" w* S4 I5 @4 s4 [; o" gJobEventType.JOB_START,
+ y7 U6 @9 a, G3 onew StartTransition())
+ [/ T; E9 I6 I) \.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, U' Y/ X- z( B( R( ^# C
JobEventType.JOB_SETUP_COMPLETED,- C% r' I2 u- w- n% [2 j( Z
new SetupCompletedTransition())
5 {+ m) E$ U6 Y& f( Q# H.addTransition' n, x8 e9 r6 I6 H6 B- f
(JobStateInternal.RUNNING,: \# q3 Q3 t2 J. j, q8 S/ x
EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),) P# [" {- |% Z& g: O$ s
JobEventType.JOB_COMPLETED,* y. q3 H. Q5 ]2 ?4 p
new JobTasksCompletedTransition()): c! }5 q3 i& r
.installTopology();
# D! s( i: Z* C- f' nprivate final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
7 c' D2 r8 o. ]: t1 q# Z5 [public JobStateMachine(String jobID, EventHandler eventHandler) {, Q7 \: H; q! {; G; I% _
this.jobID = jobID;
# T4 x! {& I; W: G+ N7 cReadWriteLock readWriteLock = new ReentrantReadWriteLock();
7 ]) R! r, V) G0 S0 rthis.readLock = readWriteLock.readLock();/ J4 T8 J/ k8 a5 n5 K
this.writeLock = readWriteLock.writeLock();
( h' p; n" v* r$ U* a; {this.eventHandler = eventHandler;3 A% c; I& W& y0 t+ n! b
stateMachine = stateMachineFactory.make(this);
* e) @5 c+ p# w/ @}p
/ F. N) r1 Y N$ U% j. m5 K! irotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {! ]; H1 k l; A& @
return stateMachine;
0 j8 a5 d0 B+ o: O& t9 o} p
% ?; t2 N0 y& ?+ L% R: Z9 uublic static class InitTransition
1 n Q6 W/ @' h' d+ wimplements SingleArcTransition<JobStateMachine, JobEvent> {/ c2 C# b, w1 k8 i' z" a2 i
@Override ]; I+ @8 @* R5 [0 Q( h/ k2 {% d" e3 M0 C
public void transition(JobStateMachine job, JobEvent event) { t. ~8 T9 u1 i* F
System.out.println("Receiving event " + event);
- a8 L% \2 E* V, R6 }; qjob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));
- n `9 \2 i8 R# j}
7 N- m7 @) [, M} p: h" S! q8 T3 s, @ E* f
ublic static class StartTransition- o: @5 m9 C5 _& c. J( U6 A; l' U
implements SingleArcTransition<JobStateMachine, JobEvent>>{
( [( X: I+ r" U( a@Override% j# R0 S2 x5 m
public void transition(JobStateMachine job, JobEvent event) {
4 t5 N7 L- D6 \" [5 s) D7 ]5 fSystem.out.println("Receiving event " + event);
, L* d8 b O" E: M% p1 _' fjob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));5 K, `3 I4 q2 ^3 u, L" F
}
1 |6 G; s' n' T! Y1 _}…/9 h* @7 ]8 E g* A% M
/定义类SetupCompletedTransition和JobTasksCompletedTransition
8 X2 l- t+ f2 Q@Override
0 v c1 {. t5 J4 [: C! q: i8 B+ |public void handle(JobEvent event) {
! I7 F! h/ ~7 i8 k. i" }+ ztry {
4 e' \* m8 Z% H6 JwriteLock.lock();. ?# M" n+ t* o7 J! V3 O1 }7 K" E. C
JobStateInternal oldState = getInternalState();
2 z( A% v' h! J% t: |try {
0 c: S F" z. Z8 XgetStateMachine().doTransition(event.getType(), event);$ x/ H f/ e J- K
} catch (InvalidStateTransitonException e) {1 J% H5 z3 T, ?% W* S% A
System.out.println("Can't handle this event at current state");1 _( {7 C, a2 `5 y7 b
}i
/ T9 t8 X- I! B% \+ @f (oldState != getInternalState()) {/ J' l6 x; }. F r: Y) e
System.out.println("Job Transitioned from " + oldState + " to "
# e+ L2 J. h) ]- g+ getInternalState());1 B8 C, {* y6 b, M- t" _2 u
}! I; R+ @* V/ l* S$ D3 _6 m0 q
}f, L+ n6 i( a! ?8 h
inally {4 L7 Q. u6 l/ M' {, U8 a% \0 G
writeLock.unlock();
; T! q4 R( {% O5 \}0 h' h1 g G+ i/ N0 i! f
} p0 g9 T6 m$ L, Q9 ?. q( o5 \
ublic JobStateInternal getInternalState() {* |$ \: ?! ^1 Y* }9 B% {
readLock.lock();* b% u. E- c$ L4 ~7 N! F7 R6 \
try {- \( U% d) ^% w0 z' C2 x. ]$ f: J
return getStateMachine().getCurrentState();1 Y8 b( }; r- t1 w5 _
} finally {; a% W" `( _7 Y2 H
readLock.unlock();
& Y% w y5 ^! r/ b, K7 N}5 H5 _* W; U U! q+ J# p7 e. I
}p( g' o1 f) b) B& r
ublic enum JobStateInternal { //作业内部状态0 L' A7 X" X2 U6 }
NEW,
$ C' R4 ~) o5 o$ f `0 LSETUP,
" F" T8 f/ k4 g/ S6 g$ fINITED,
# n# K; M3 [ F7 _8 z6 d3 ORUNNING,
* M4 }* z5 M* @" @3 fSUCCEEDED,; X9 J& Y; ^( s/ i5 h5 E
KILLED,
9 p" e* u+ l, G i}; i' N8 s6 k) b# ]: C& Y6 W/ o
}
7 }2 L* W3 }! e9 [6 N3.5.4 状态机可视化/ o) i4 ]. k5 a8 L1 B9 r
YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImpl、 RMApp-AttemptImpl、 RMContainerImpl和
. y# ?# d* l$ DRMNodeImpl, NodeManager中的ApplicationImpl、 ContainerImpl和LocalizedResource, MRAppMaster中的JobImpl、 TaskImpl和0 ]" {- Z5 W- I) e' @, Z7 X' A
TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操5 u g6 z2 ^, O
作步骤如下。
5 S1 R( V4 z$ O6 J步骤1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:. B5 j, K v6 s Q) [
mvn compile -Pvisualize
9 ^, x' o8 c n经过该步骤后, 本地目录中生成了ResourceManager.gv、 NodeManager.gv和MapReduce.gv三个graphviz格式的文件( 有兴趣可以& a; i3 b' ~% U7 I% H
直接打开查看具体内容) 。
5 g: r& i& c9 ?步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
( v* O- N: |+ G C! W9 s9 ~+ S# |dot -Tpng NodeManager.gv > NodeManager.png
5 Y7 r" Z' @6 m8 p如果尚未安装graphviz包, 操作该步骤之前先要安装该包。! z" L' n: @: u9 x/ I5 r9 w0 f
注释& U: z, W. k0 y+ p7 i! r& G* N
[16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930。
+ T8 \$ m4 x% }+ K$ A, T! V
' Q y$ H1 C$ W/ M% d+ o, L" f& p; S8 A
|
|