|
3.5 状态机库
( R! J- @/ V9 \7 Z5 ?状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中: b: E7 j# q: C% F" N7 P
间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
! a+ p! R( _6 o态。 当状态机转换到最终状态时, 则退出。) z) {! }" ?% o; h
3.5.1 YARN状态转换方式
; U# }; o& I a5 N在YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和
: ], X! z" ~0 C2 |& @; w回调函数( hook) 。 YARN定义了三种状态转换方式, 具体如下:
; c) q. p7 M2 d/ [: Y1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
! o* l7 U$ Z& E: E函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。
' X7 ?) i- ~) f图3-18 初始状态:最终状态:事件=1:1:1* F6 z8 ^3 u9 b0 Q, t
2) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行2 x6 L- f& T% p2 O) y. B* o
函数状态转移函数Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。- N& z4 Y1 ~3 w$ _2 t% Z
图3-19 初始状态:最终状态:事件=1:N :1
; N1 H$ Z4 W X5 V3 R j( N3) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1、 Event2和
* [6 |' x$ x4 }9 DEvent3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。8 U, u! g( V: f2 C
图3-20 初始状态:最终状态:事件=1:1:N
. {; V0 S$ U$ h% h1 d. q3.5.2 状态机类
3 I+ M% Y8 O+ b% h& R: @- IYARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了
" b7 b6 j0 m1 `2 @一个状态机工厂StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调$ ?; L6 A' T6 Y/ _+ M8 J
用installTopology完成一个状态机的构建。
9 S2 W- S7 X1 j9 \+ ]7 b图3-21 状态机类图
# T% G$ g6 q" M+ C# t- ` a1 _3.5.3 状态机的使用方法
4 b; w& D4 z" Y7 u8 q本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态$ y) p: @- [# T! F; g( g
变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度
8 R* W' L1 s2 i( }6 c* t7 N* Z器, 可以嵌到3.4.3节的实例程序中运行。2 [* o5 g2 K6 R* U5 Q
1) 定义作业类型。
& w: E0 M1 u' k4 {: npublic enum JobEventType {, T3 W8 r, {7 l Q& Q
JOB_KILL,
# E; c* B0 w8 U% G& N9 ^4 Q' S* n, f1 pJOB_INIT,
' o3 D) H$ j4 \1 w0 E& y5 v8 p7 ~JOB_START,4 ?/ ~! t. @: A5 j9 |; G. x
JOB_SETUP_COMPLETED," e7 c( T( F, f" F& P$ I# `% h
JOB_COMPLETED
+ q) f" d6 w! h, D+ X1 I}/ D7 O/ ^+ ?. P* s* o! i q$ p
2) 定义作业状态机。0 ~% A J, p3 k6 K1 d8 i: H C* N
@SuppressWarnings({ "rawtypes", "unchecked" })% }9 d7 f+ E0 C- h4 {
public class JobStateMachine implements EventHandler<JobEvent>{
6 t" r; _( k( J6 \" Eprivate final String jobID;3 `3 p9 d' F3 V& g) O, p
private EventHandler eventHandler;
8 s# n; u4 O" a% z) }0 Nprivate final Lock writeLock;
; L* z4 H2 A2 ~4 k2 @1 Gprivate final Lock readLock;
1 b0 m2 W1 O( H& _9 I" y8 z// 定义状态机
: S" L( f( Y; \; ]protected static final& s4 }8 \' W' d/ C j' k1 i- E
StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>8 P: k" K& [/ v+ L3 O$ m- M# `
stateMachineFactory) L2 p# Z1 a2 }
= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>0 @- @7 i9 \0 n% w4 n0 c& F4 V
(JobStateInternal.NEW)
% m ]) y/ L; ?) T- m.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,! x) @6 ?4 t" u+ B
JobEventType.JOB_INIT,
3 D$ Z9 F$ O! G' j, F1 U" e" Tnew InitTransition())
/ R3 Z1 }& ~0 c; E p3 a) m5 }.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
4 M7 w' w- W4 c) O7 ]JobEventType.JOB_START,
& o! i& @- s8 @8 t6 O$ D& enew StartTransition())) m& L7 q% [1 W6 b9 B' B% E
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
$ z7 S: P, N, ], @- L9 zJobEventType.JOB_SETUP_COMPLETED,3 w- k( M% R9 [& `% r
new SetupCompletedTransition())
: w% l# E. `) |8 r.addTransition
/ Y8 ^7 s8 e4 x* F) v(JobStateInternal.RUNNING,
1 f7 q* Y j& e( y$ I. aEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),3 G3 W; J( a& ~/ V; J
JobEventType.JOB_COMPLETED,3 b0 T4 S2 r ^. e* C* ^7 W5 L. _* T6 ?
new JobTasksCompletedTransition())5 f2 n6 ~+ @+ p* i2 R2 b* A( q
.installTopology();! @/ R2 f9 a2 i$ l
private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
3 K% f/ h) ^/ C( z$ lpublic JobStateMachine(String jobID, EventHandler eventHandler) {
. o% E8 y& o T- x, ]this.jobID = jobID;
4 M1 c$ V) D: B# I0 BReadWriteLock readWriteLock = new ReentrantReadWriteLock();: i5 w7 ]$ @- p% z" T
this.readLock = readWriteLock.readLock();5 v6 }# h* C: Q! y) Y' K
this.writeLock = readWriteLock.writeLock();4 ]+ {- C0 @- }) ~; ]3 y
this.eventHandler = eventHandler;+ }; [! t, S5 d( y9 [4 @" F
stateMachine = stateMachineFactory.make(this); b1 D, \* X; D& j, M; h0 C
}p, }4 C6 n$ _4 w7 a8 O0 ~
rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {! U" ?6 I$ [: a9 n8 i
return stateMachine;
+ C/ t, a& n/ [} p
- Q& i- l3 f! z, w0 uublic static class InitTransition; \. |" u' e1 {7 N! T
implements SingleArcTransition<JobStateMachine, JobEvent> {
: z& ~( z+ k3 O. W: d@Override; c5 Z) Y6 k2 w4 U/ Z/ L1 I; M
public void transition(JobStateMachine job, JobEvent event) {
9 Q1 u& {8 K y( f; DSystem.out.println("Receiving event " + event);3 C& X, r) R6 A$ `' J: l
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));1 }( O* H* U0 w# {3 ~
}
: e' x7 ~; R u" ?& t% r! x# t} p
5 q6 W, S# E$ _# o9 zublic static class StartTransition
5 L- U# _5 ?( I" g" C8 M6 ?implements SingleArcTransition<JobStateMachine, JobEvent>>{
1 s) z2 _5 n$ l) L+ B& k! h@Override
7 E* B9 W& K3 Q- Lpublic void transition(JobStateMachine job, JobEvent event) {
% i% ^' h) I1 q1 @1 }7 l, hSystem.out.println("Receiving event " + event);
0 l+ \/ u" @1 t6 F, C8 ojob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));
0 w( q2 x; d9 ?* J. @/ l/ z$ [}
# A/ G% e6 T) F& f+ H}…/6 L$ M4 D; n3 @# z4 b' j$ K8 ^
/定义类SetupCompletedTransition和JobTasksCompletedTransition! w5 e8 _" y+ Y. b% s5 z
@Override6 B$ j5 R9 ]& }$ k7 Z4 Z/ q
public void handle(JobEvent event) {: _: o6 I2 @0 E v
try {* U9 ^4 c3 R$ Y
writeLock.lock();+ r" h! h8 n5 ^
JobStateInternal oldState = getInternalState();
. E p! k' t& o/ S6 X! xtry {- o d! T4 G8 R! @: B
getStateMachine().doTransition(event.getType(), event);6 T+ v$ E/ k. g. x2 _7 C% t
} catch (InvalidStateTransitonException e) {7 v0 Q3 `& h8 ^! J$ b; ~
System.out.println("Can't handle this event at current state");
/ X% G, i& q8 S$ h7 L}i" r; D, ]4 h( }! w
f (oldState != getInternalState()) {( f' i5 v/ ]: {
System.out.println("Job Transitioned from " + oldState + " to "
" n* ?' l- G- {) u* @( _+ getInternalState());
% |5 e; u( y) K Z! M}
: C6 y( f4 W* {; Y2 ~5 @2 `) }}f
/ N4 H, }/ W9 T0 P4 I9 @- Einally {0 G+ B( h; d' Q3 v7 U6 X
writeLock.unlock(); ~3 n c* H8 h* J1 m, H
}
' q' h6 n; q. F/ Y3 j9 |2 k} p
5 g- \ g' g- o8 jublic JobStateInternal getInternalState() {
8 ]5 T( ~; e( \: dreadLock.lock();
6 w$ U2 Q2 J% }1 z# Htry {
- `( I5 n* S+ `return getStateMachine().getCurrentState();
8 { L, x5 ]5 u1 P} finally {
: D* e/ p- @9 ]' G$ \* e/ L$ h7 h* C# LreadLock.unlock();5 d: v+ _6 F( L2 O5 L' d
}4 C* Z, G4 V% D1 y/ b3 z9 w
}p: p. K4 \+ i0 ]$ S
ublic enum JobStateInternal { //作业内部状态) `# h/ z+ w; t% D5 P
NEW,
. o5 e5 y. c) P; T5 u+ x8 `& |6 ASETUP,7 A2 K! U2 H" C( n; Y! i" E6 w
INITED," |2 K( @. Q8 @( M7 Z3 {( {; v% r
RUNNING,& m' T9 J% @; q( X+ i
SUCCEEDED,) G) V6 H- t0 ]) a
KILLED,0 E; r" A: ]" A' @+ R
}+ O# X7 ^2 Z4 @- h1 _
}
- K% |7 b4 C; S9 @2 T3.5.4 状态机可视化
5 M! p8 u* u$ }2 w2 g0 }+ {6 N' zYARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImpl、 RMApp-AttemptImpl、 RMContainerImpl和1 @. h4 `: u3 p* h* u6 ~' d
RMNodeImpl, NodeManager中的ApplicationImpl、 ContainerImpl和LocalizedResource, MRAppMaster中的JobImpl、 TaskImpl和
7 Q4 F' g; x2 ETaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
+ Y! w9 A% p& E! H作步骤如下。. ]1 D6 \, Z; W. ?
步骤1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:3 d3 N7 m# v( ^3 F* j: R' m
mvn compile -Pvisualize: `! O: g* U: \3 R p* h
经过该步骤后, 本地目录中生成了ResourceManager.gv、 NodeManager.gv和MapReduce.gv三个graphviz格式的文件( 有兴趣可以& a7 n) W+ i4 S* S* C( |
直接打开查看具体内容) 。: w2 J5 F. Q* D: I3 m. N
步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
/ d3 U' p. |: F. n5 edot -Tpng NodeManager.gv > NodeManager.png
& Y. ^7 Y J" n H" G1 k* X如果尚未安装graphviz包, 操作该步骤之前先要安装该包。: {; D) d& B) T: `0 k8 p' C e4 ~
注释
3 A& x/ j: L! E* I0 Y[16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930。
+ L# A7 j" ^% z% j0 H, ~
& ?( [; K& z: B* y0 c% e) y
+ A0 H k/ o3 ]/ ^6 p9 v |
|