java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3003|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66093

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库) e7 ]2 e" L- l3 {) n9 {, x$ ~2 k( [4 R
    状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中- D2 Z3 H* p! D  q( K
    间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
    ; d1 R) H: M+ ]3 o态。 当状态机转换到最终状态时, 则退出。
    & ?  D6 L6 j, w9 t
    3.5.1 YARN状态转换方式
    ) m: _2 T7 U7 G% D/ X5 l
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和9 b3 ?5 A3 H$ t* m+ E
    回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:
    : d0 H/ N, ^" `
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    7 p; y$ J) o! w3 p9 ~! B函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState4 x( ~3 r- E; W- A4 c9 t" [
    3-18 初始状态:最终状态:事件=1:1:15 g+ z6 A3 B4 Y# Y' W8 B% F
    2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    * ]6 B6 ^7 n4 L, y1 Z3 J% U: ^0 x函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。
    2 U* U) K, Y* R: F2 e( c2 ~, v) m
    3-19 初始状态:最终状态:事件=1:N :1( N* {4 T+ |- k% F+ E: d! K* ~: A
    3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event2
    # d* j0 [0 a, l: a* B( f
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState
    & ?) g3 [% X9 T; X3-20 初始状态:最终状态:事件=1:1:N
    8 y( h& f8 Q2 G
    3.5.2 状态机类& B  ^7 U/ g5 Q
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了
    # @) I9 P5 g$ M( d* m& f3 ]一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调
    3 @5 K" S0 }7 g; U( C3 l3 b. U. E
    installTopology完成一个状态机的构建。- z" O2 X% L/ H( m4 S) X3 t* `
    3-21 状态机类图
    9 P  h2 Y4 s1 W% ^: s
    3.5.3 状态机的使用方法  E( i5 r) ^9 g5 N& R6 j4 x
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
    6 F2 Z- U7 K8 t% ^' `, @变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度8 H+ s, Q; k) j- K( E! a$ N
    器, 可以嵌到
    3.4.3节的实例程序中运行。% U% U/ C8 ^* v. g6 J- g; n( D/ r- S
    1) 定义作业类型。
    ! o" E' c' A6 z6 Q
    public enum JobEventType {
    5 p) W% U5 Z! }9 _# C. P0 F0 \JOB_KILL,
    " `# V: e' V" a- k% rJOB_INIT,4 p' |/ v+ }. ]3 `  ^3 R$ D
    JOB_START,* ]" z" V0 Y9 v
    JOB_SETUP_COMPLETED,
    # x1 e7 V* C& U. o! D! Q& g9 k! A' ]JOB_COMPLETED
    6 K; S, w- b% [}
    5 q  I% w3 C* J. i2) 定义作业状态机。
    6 Q/ t* d! O" r' {
    @SuppressWarnings({ "rawtypes", "unchecked" })
    ) Z+ ?- Z1 O- ^7 opublic class JobStateMachine implements EventHandler<JobEvent>{" X( C& s! p. |( x9 J
    private final String jobID;
    6 t  ^: ^3 s6 g3 E+ l# I6 Kprivate EventHandler eventHandler;* \& p4 S* k7 Q
    private final Lock writeLock;
    * R6 b6 m$ U8 |# }" [private final Lock readLock;- j, b) p! R# J1 Q. u
    // 定义状态机
    ; d2 H% j* |2 P6 r6 {* `
    protected static final" n7 W1 Y& e4 E/ C5 A
    StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>" x) e) B0 n. G- y+ v
    stateMachineFactory- K/ N, O2 u$ F, x% S
    = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
    9 U' P* P2 k0 \6 l/ b( K6 O" l/ A5 t(JobStateInternal.NEW)/ S) W! `( l. v8 V
    .addTransition(JobStateInternal.NEW, JobStateInternal.INITED,* G$ B5 Q/ o! c3 c+ o* w
    JobEventType.JOB_INIT,
    , k+ w8 {7 j3 }4 i! y$ h( mnew InitTransition())
    ) u- C  w" j+ Q7 m1 X4 d.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
    / b" w* S4 I5 @4 s4 [; o" gJobEventType.JOB_START,
    + y7 U6 @9 a, G3 onew StartTransition())
    + [/ T; E9 I6 I) \.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,  U' Y/ X- z( B( R( ^# C
    JobEventType.JOB_SETUP_COMPLETED,- C% r' I2 u- w- n% [2 j( Z
    new SetupCompletedTransition())
    5 {+ m) E$ U6 Y& f( Q# H.addTransition' n, x8 e9 r6 I6 H6 B- f
    (JobStateInternal.RUNNING,: \# q3 Q3 t2 J. j, q8 S/ x
    EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),) P# [" {- |% Z& g: O$ s
    JobEventType.JOB_COMPLETED,* y. q3 H. Q5 ]2 ?4 p
    new JobTasksCompletedTransition()): c! }5 q3 i& r
    .installTopology();
    # D! s( i: Z* C- f' nprivate final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
    7 c' D2 r8 o. ]: t1 q# Z5 [public JobStateMachine(String jobID, EventHandler eventHandler) {, Q7 \: H; q! {; G; I% _
    this.jobID = jobID;
    # T4 x! {& I; W: G+ N7 cReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    7 ]) R! r, V) G0 S0 rthis.readLock = readWriteLock.readLock();/ J4 T8 J/ k8 a5 n5 K
    this.writeLock = readWriteLock.writeLock();
    ( h' p; n" v* r$ U* a; {this.eventHandler = eventHandler;3 A% c; I& W& y0 t+ n! b
    stateMachine = stateMachineFactory.make(this);
    * e) @5 c+ p# w/ @}p
    / F. N) r1 Y  N$ U% j. m5 K! irotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {! ]; H1 k  l; A& @
    return stateMachine;
    0 j8 a5 d0 B+ o: O& t9 o} p
    % ?; t2 N0 y& ?+ L% R: Z9 uublic static class InitTransition
    1 n  Q6 W/ @' h' d+ wimplements SingleArcTransition<JobStateMachine, JobEvent> {/ c2 C# b, w1 k8 i' z" a2 i
    @Override  ]; I+ @8 @* R5 [0 Q( h/ k2 {% d" e3 M0 C
    public void transition(JobStateMachine job, JobEvent event) {  t. ~8 T9 u1 i* F
    System.out.println("Receiving event " + event);
    - a8 L% \2 E* V, R6 }; qjob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));
    - n  `9 \2 i8 R# j}
    7 N- m7 @) [, M} p: h" S! q8 T3 s, @  E* f
    ublic static class StartTransition- o: @5 m9 C5 _& c. J( U6 A; l' U
    implements SingleArcTransition<JobStateMachine, JobEvent>>{
    ( [( X: I+ r" U( a@Override% j# R0 S2 x5 m
    public void transition(JobStateMachine job, JobEvent event) {
    4 t5 N7 L- D6 \" [5 s) D7 ]5 fSystem.out.println("Receiving event " + event);
    , L* d8 b  O" E: M% p1 _' fjob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));5 K, `3 I4 q2 ^3 u, L" F
    }
    1 |6 G; s' n' T! Y1 _}…/9 h* @7 ]8 E  g* A% M
    /定义类SetupCompletedTransitionJobTasksCompletedTransition
    8 X2 l- t+ f2 Q@Override
    0 v  c1 {. t5 J4 [: C! q: i8 B+ |public void handle(JobEvent event) {
    ! I7 F! h/ ~7 i8 k. i" }+ ztry {
    4 e' \* m8 Z% H6 JwriteLock.lock();. ?# M" n+ t* o7 J! V3 O1 }7 K" E. C
    JobStateInternal oldState = getInternalState();
    2 z( A% v' h! J% t: |try {
    0 c: S  F" z. Z8 XgetStateMachine().doTransition(event.getType(), event);$ x/ H  f/ e  J- K
    } catch (InvalidStateTransitonException e) {1 J% H5 z3 T, ?% W* S% A
    System.out.println("Can't handle this event at current state");1 _( {7 C, a2 `5 y7 b
    }i
    / T9 t8 X- I! B% \+ @f (oldState != getInternalState()) {/ J' l6 x; }. F  r: Y) e
    System.out.println("Job Transitioned from " + oldState + " to "
    # e+ L2 J. h) ]- g+ getInternalState());1 B8 C, {* y6 b, M- t" _2 u
    }! I; R+ @* V/ l* S$ D3 _6 m0 q
    }f, L+ n6 i( a! ?8 h
    inally {4 L7 Q. u6 l/ M' {, U8 a% \0 G
    writeLock.unlock();
    ; T! q4 R( {% O5 \}0 h' h1 g  G+ i/ N0 i! f
    } p0 g9 T6 m$ L, Q9 ?. q( o5 \
    ublic JobStateInternal getInternalState() {* |$ \: ?! ^1 Y* }9 B% {
    readLock.lock();* b% u. E- c$ L4 ~7 N! F7 R6 \
    try {- \( U% d) ^% w0 z' C2 x. ]$ f: J
    return getStateMachine().getCurrentState();1 Y8 b( }; r- t1 w5 _
    } finally {; a% W" `( _7 Y2 H
    readLock.unlock();
    & Y% w  y5 ^! r/ b, K7 N}5 H5 _* W; U  U! q+ J# p7 e. I
    }p( g' o1 f) b) B& r
    ublic enum JobStateInternal { //作业内部状态0 L' A7 X" X2 U6 }
    NEW,
    $ C' R4 ~) o5 o$ f  `0 LSETUP,
    " F" T8 f/ k4 g/ S6 g$ fINITED,
    # n# K; M3 [  F7 _8 z6 d3 ORUNNING,
    * M4 }* z5 M* @" @3 fSUCCEEDED,; X9 J& Y; ^( s/ i5 h5 E
    KILLED,
    9 p" e* u+ l, G  i}; i' N8 s6 k) b# ]: C& Y6 W/ o
    }
    7 }2 L* W3 }! e9 [6 N3.5.4 状态机可视化/ o) i4 ]. k5 a8 L1 B9 r
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl
    . y# ?# d* l$ D
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl0 ]" {- Z5 W- I) e' @, Z7 X' A
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操5 u  g6 z2 ^, O
    作步骤如下。
    5 S1 R( V4 z$ O6 J步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:. B5 j, K  v6 s  Q) [
    mvn compile -Pvisualize
    9 ^, x' o8 c  n经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以& a; i3 b' ~% U7 I% H
    直接打开查看具体内容) 。

    5 g: r& i& c9 ?步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
    ( v* O- N: |+ G  C! W9 s9 ~+ S# |
    dot -Tpng NodeManager.gv > NodeManager.png
    5 Y7 r" Z' @6 m8 p如果尚未安装graphviz包, 操作该步骤之前先要安装该包。! z" L' n: @: u9 x/ I5 r9 w0 f
    注释& U: z, W. k0 y+ p7 i! r& G* N
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    + T8 \$ m4 x% }+ K$ A, T! V
    ' Q  y$ H1 C$ W/ M% d+ o, L" f& p; S8 A
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-4-25 14:34 , Processed in 0.100064 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表