java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3007|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66101

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库
    ( R! J- @/ V9 \7 Z5 ?状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中: b: E7 j# q: C% F" N7 P
    间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
    ! a+ p! R( _6 o态。 当状态机转换到最终状态时, 则退出。) z) {! }" ?% o; h
    3.5.1 YARN状态转换方式
    ; U# }; o& I  a5 N
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和
    : ], X! z" ~0 C2 |& @; w回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:
    ; c) q. p7 M2 d/ [: Y
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    ! o* l7 U$ Z& E: E函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState
    ' X7 ?) i- ~) f
    3-18 初始状态:最终状态:事件=1:1:1* F6 z8 ^3 u9 b0 Q, t
    2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行2 x6 L- f& T% p2 O) y. B* o
    函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。- N& z4 Y1 ~3 w$ _2 t% Z
    3-19 初始状态:最终状态:事件=1:N :1
    ; N1 H$ Z4 W  X5 V3 R  j( N3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event2
    * [6 |' x$ x4 }9 D
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState8 U, u! g( V: f2 C
    3-20 初始状态:最终状态:事件=1:1:N
    . {; V0 S$ U$ h% h1 d. q
    3.5.2 状态机类
    3 I+ M% Y8 O+ b% h& R: @- I
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了
    " b7 b6 j0 m1 `2 @一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调$ ?; L6 A' T6 Y/ _+ M8 J
    installTopology完成一个状态机的构建。
    9 S2 W- S7 X1 j9 \+ ]7 b
    3-21 状态机类图
    # T% G$ g6 q" M+ C# t- `  a1 _
    3.5.3 状态机的使用方法
    4 b; w& D4 z" Y7 u8 q
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态$ y) p: @- [# T! F; g( g
    变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度
    8 R* W' L1 s2 i( }6 c* t7 N* Z器, 可以嵌到
    3.4.3节的实例程序中运行。2 [* o5 g2 K6 R* U5 Q
    1) 定义作业类型。
    & w: E0 M1 u' k4 {: n
    public enum JobEventType {, T3 W8 r, {7 l  Q& Q
    JOB_KILL,
    # E; c* B0 w8 U% G& N9 ^4 Q' S* n, f1 pJOB_INIT,
    ' o3 D) H$ j4 \1 w0 E& y5 v8 p7 ~JOB_START,4 ?/ ~! t. @: A5 j9 |; G. x
    JOB_SETUP_COMPLETED," e7 c( T( F, f" F& P$ I# `% h
    JOB_COMPLETED
    + q) f" d6 w! h, D+ X1 I}/ D7 O/ ^+ ?. P* s* o! i  q$ p
    2) 定义作业状态机。0 ~% A  J, p3 k6 K1 d8 i: H  C* N
    @SuppressWarnings({ "rawtypes", "unchecked" })% }9 d7 f+ E0 C- h4 {
    public class JobStateMachine implements EventHandler<JobEvent>{
    6 t" r; _( k( J6 \" Eprivate final String jobID;3 `3 p9 d' F3 V& g) O, p
    private EventHandler eventHandler;
    8 s# n; u4 O" a% z) }0 Nprivate final Lock writeLock;
    ; L* z4 H2 A2 ~4 k2 @1 Gprivate final Lock readLock;
    1 b0 m2 W1 O( H& _9 I" y8 z// 定义状态机
    : S" L( f( Y; \; ]
    protected static final& s4 }8 \' W' d/ C  j' k1 i- E
    StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>8 P: k" K& [/ v+ L3 O$ m- M# `
    stateMachineFactory) L2 p# Z1 a2 }
    = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>0 @- @7 i9 \0 n% w4 n0 c& F4 V
    (JobStateInternal.NEW)
    % m  ]) y/ L; ?) T- m.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,! x) @6 ?4 t" u+ B
    JobEventType.JOB_INIT,
    3 D$ Z9 F$ O! G' j, F1 U" e" Tnew InitTransition())
    / R3 Z1 }& ~0 c; E  p3 a) m5 }.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
    4 M7 w' w- W4 c) O7 ]JobEventType.JOB_START,
    & o! i& @- s8 @8 t6 O$ D& enew StartTransition())) m& L7 q% [1 W6 b9 B' B% E
    .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
    $ z7 S: P, N, ], @- L9 zJobEventType.JOB_SETUP_COMPLETED,3 w- k( M% R9 [& `% r
    new SetupCompletedTransition())
    : w% l# E. `) |8 r.addTransition
    / Y8 ^7 s8 e4 x* F) v(JobStateInternal.RUNNING,
    1 f7 q* Y  j& e( y$ I. aEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),3 G3 W; J( a& ~/ V; J
    JobEventType.JOB_COMPLETED,3 b0 T4 S2 r  ^. e* C* ^7 W5 L. _* T6 ?
    new JobTasksCompletedTransition())5 f2 n6 ~+ @+ p* i2 R2 b* A( q
    .installTopology();! @/ R2 f9 a2 i$ l
    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
    3 K% f/ h) ^/ C( z$ lpublic JobStateMachine(String jobID, EventHandler eventHandler) {
    . o% E8 y& o  T- x, ]this.jobID = jobID;
    4 M1 c$ V) D: B# I0 BReadWriteLock readWriteLock = new ReentrantReadWriteLock();: i5 w7 ]$ @- p% z" T
    this.readLock = readWriteLock.readLock();5 v6 }# h* C: Q! y) Y' K
    this.writeLock = readWriteLock.writeLock();4 ]+ {- C0 @- }) ~; ]3 y
    this.eventHandler = eventHandler;+ }; [! t, S5 d( y9 [4 @" F
    stateMachine = stateMachineFactory.make(this);  b1 D, \* X; D& j, M; h0 C
    }p, }4 C6 n$ _4 w7 a8 O0 ~
    rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {! U" ?6 I$ [: a9 n8 i
    return stateMachine;
    + C/ t, a& n/ [} p
    - Q& i- l3 f! z, w0 uublic static class InitTransition; \. |" u' e1 {7 N! T
    implements SingleArcTransition<JobStateMachine, JobEvent> {
    : z& ~( z+ k3 O. W: d@Override; c5 Z) Y6 k2 w4 U/ Z/ L1 I; M
    public void transition(JobStateMachine job, JobEvent event) {
    9 Q1 u& {8 K  y( f; DSystem.out.println("Receiving event " + event);3 C& X, r) R6 A$ `' J: l
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));1 }( O* H* U0 w# {3 ~
    }
    : e' x7 ~; R  u" ?& t% r! x# t} p
    5 q6 W, S# E$ _# o9 zublic static class StartTransition
    5 L- U# _5 ?( I" g" C8 M6 ?implements SingleArcTransition<JobStateMachine, JobEvent>>{
    1 s) z2 _5 n$ l) L+ B& k! h@Override
    7 E* B9 W& K3 Q- Lpublic void transition(JobStateMachine job, JobEvent event) {
    % i% ^' h) I1 q1 @1 }7 l, hSystem.out.println("Receiving event " + event);
    0 l+ \/ u" @1 t6 F, C8 ojob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));
    0 w( q2 x; d9 ?* J. @/ l/ z$ [}
    # A/ G% e6 T) F& f+ H}…/6 L$ M4 D; n3 @# z4 b' j$ K8 ^
    /定义类SetupCompletedTransitionJobTasksCompletedTransition! w5 e8 _" y+ Y. b% s5 z
    @Override6 B$ j5 R9 ]& }$ k7 Z4 Z/ q
    public void handle(JobEvent event) {: _: o6 I2 @0 E  v
    try {* U9 ^4 c3 R$ Y
    writeLock.lock();+ r" h! h8 n5 ^
    JobStateInternal oldState = getInternalState();
    . E  p! k' t& o/ S6 X! xtry {- o  d! T4 G8 R! @: B
    getStateMachine().doTransition(event.getType(), event);6 T+ v$ E/ k. g. x2 _7 C% t
    } catch (InvalidStateTransitonException e) {7 v0 Q3 `& h8 ^! J$ b; ~
    System.out.println("Can't handle this event at current state");
    / X% G, i& q8 S$ h7 L}i" r; D, ]4 h( }! w
    f (oldState != getInternalState()) {( f' i5 v/ ]: {
    System.out.println("Job Transitioned from " + oldState + " to "
    " n* ?' l- G- {) u* @( _+ getInternalState());
    % |5 e; u( y) K  Z! M}
    : C6 y( f4 W* {; Y2 ~5 @2 `) }}f
    / N4 H, }/ W9 T0 P4 I9 @- Einally {0 G+ B( h; d' Q3 v7 U6 X
    writeLock.unlock();  ~3 n  c* H8 h* J1 m, H
    }
    ' q' h6 n; q. F/ Y3 j9 |2 k} p
    5 g- \  g' g- o8 jublic JobStateInternal getInternalState() {
    8 ]5 T( ~; e( \: dreadLock.lock();
    6 w$ U2 Q2 J% }1 z# Htry {
    - `( I5 n* S+ `return getStateMachine().getCurrentState();
    8 {  L, x5 ]5 u1 P} finally {
    : D* e/ p- @9 ]' G$ \* e/ L$ h7 h* C# LreadLock.unlock();5 d: v+ _6 F( L2 O5 L' d
    }4 C* Z, G4 V% D1 y/ b3 z9 w
    }p: p. K4 \+ i0 ]$ S
    ublic enum JobStateInternal { //作业内部状态) `# h/ z+ w; t% D5 P
    NEW,
    . o5 e5 y. c) P; T5 u+ x8 `& |6 ASETUP,7 A2 K! U2 H" C( n; Y! i" E6 w
    INITED," |2 K( @. Q8 @( M7 Z3 {( {; v% r
    RUNNING,& m' T9 J% @; q( X+ i
    SUCCEEDED,) G) V6 H- t0 ]) a
    KILLED,0 E; r" A: ]" A' @+ R
    }+ O# X7 ^2 Z4 @- h1 _
    }
    - K% |7 b4 C; S9 @2 T3.5.4 状态机可视化
    5 M! p8 u* u$ }2 w2 g0 }+ {6 N' z
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl1 @. h4 `: u3 p* h* u6 ~' d
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl
    7 Q4 F' g; x2 E
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
    + Y! w9 A% p& E! H作步骤如下。. ]1 D6 \, Z; W. ?
    步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:3 d3 N7 m# v( ^3 F* j: R' m
    mvn compile -Pvisualize: `! O: g* U: \3 R  p* h
    经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以& a7 n) W+ i4 S* S* C( |
    直接打开查看具体内容) 。
    : w2 J5 F. Q* D: I3 m. N
    步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
    / d3 U' p. |: F. n5 e
    dot -Tpng NodeManager.gv > NodeManager.png
    & Y. ^7 Y  J" n  H" G1 k* X如果尚未安装graphviz包, 操作该步骤之前先要安装该包。: {; D) d& B) T: `0 k8 p' C  e4 ~
    注释
    3 A& x/ j: L! E* I0 Y
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    + L# A7 j" ^% z% j0 H, ~
    & ?( [; K& z: B* y0 c% e) y
    + A0 H  k/ o3 ]/ ^6 p9 v
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-4-29 21:54 , Processed in 0.075466 second(s), 35 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表