java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3742|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66083

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库7 O% X8 G# y5 k3 p
    本节介绍服务库和事件库。
    4 L: @6 u) h& H9 A" L4 ]. U
    3.4.1 服务库- I6 s2 E: R. z' D
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
    & N+ s/ Y7 y" R  m: K: y
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
    6 X9 p- |0 X& K6 ?% {
    STOPPED( 已停止) 。; k0 Z5 j/ t1 q9 ~
    ❑任何服务状态变化都可以触发另外一些动作。
    ; K* P* D8 S5 W1 p8 f, L
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。8 z' Z( W  g, ~( r, Q- \7 K6 p! H
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
    ) F+ W( a: p6 [$ n+ @8 Z9 C对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
    ; |3 S5 p+ \9 P. a; k
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于6 [" k4 M) ?, x
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher1 d2 I- X7 C. p! f- q! h
    ApplicationMasterService等。
    0 `2 ]- G& z" u% [" `# x
    3-13 YARN中服务模型的类图
      ?; R+ n% s9 z8 f: k9 W' r
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服2 {0 Z9 N4 H+ q- D; D
    务的统一管理。
    2 \9 S+ F3 h9 P
    3.4.2 事件库
    % r8 z# o6 M; n1 \, kYARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN: t' K! a, u1 Q0 v( a
    各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处& D7 t/ E& o7 k5 E; S* f3 Q& `
    理模型可概括为图
    3-14所示。
    ' X$ u  d; }( [% L7 u6 h
    3-14 YARN的事件处理模型% F+ Z* v$ @- a. @7 _; S
    整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器
    6 P' ]) a( J8 A, r% b4 D
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其3 C9 b  T+ Q  N: N6 C6 x
    处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成% j& n; j0 `% J, L/ Y
    ( 达到终止条件) 。; ^! m  e4 X0 ?
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager6 H" y, H2 t1 D) P4 m
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型4 e% `4 y( M# O
    驱动服务的运行。4 k3 O$ x9 P+ x5 g8 u4 p5 w) |
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
    6 B: m' \: [2 B0 v! Y4 i要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器# A% e: x6 B5 I& ~' u- z  q+ u
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器: u% K& w7 ^! Y/ A5 ^" j2 E
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由
    , @$ R# T8 Z/ b; Y1 y$ j中央异步调度器统一管理和调度。
    ' d7 P. z; B0 s8 M4 P* ~9 J8 F5 {- b服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间& E3 @! F4 ]" C# h, L% Y
    则采用事件联系起来, 系统设计简单且维护方便。
    & r3 {5 Q1 \1 J5 \% L# M+ C
    3-15 事件与事件处理器/ c4 A- i# p$ C+ A( O
    3.4.3 YARN服务库和事件库的使用方法
    3 g$ E2 Z( R& U! m- B
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce7 n; V+ S: j$ N  s- B+ C6 \
    ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
    ) a, X0 v- B. n9 I) q: Q* Q; ~( @
    1) 定义Task事件。
    ' T* B6 i) _; n
    public class TaskEvent extends AbstractEvent<TaskEventType> {, Y- C9 c- V: N4 s* W% H, n  {" ?0 ]
    private String taskID; //Task ID! a  A+ ^" G) @8 G5 C' D2 q# R
    public TaskEvent(String taskID, TaskEventType type) {
    0 Z0 Y  w! l, t6 `super(type);8 F0 m( w% E. H" [
    this.taskID = taskID;
    ' [4 \' w: a* }( h) o! l1 P} p# R: f2 _5 {! d( c5 @
    ublic String getTaskID() {
    ; R0 H/ g* {9 o) o, kreturn taskID;% \( b& O2 N; N4 A
    }
    1 Z- x4 V3 i" g2 Q中,
    Task事件类型定义如下:
    . H$ c1 U1 U. p& a
    public enum TaskEventType {6 f6 _9 v0 p; T0 q$ u/ i% A
    T_KILL,
    ; C- K; {0 X1 }  G5 z# J6 X+ f9 X% n, cT_SCHEDULE6 ]4 t! s" Q: a# r* @1 M
    }
    ( [, q( G+ J+ }  _9 m' l  k2) 定义Job事件。
    ! ]5 s( p" h' ^, B9 s- Z
    public class JobEvent extends AbstractEvent<JobEventType> {3 e  [# X8 B: X; n
    private String jobID;, V6 K8 `! H2 C: V1 A; N8 i: R
    public JobEvent(String jobID, JobEventType type) {
    6 h/ q. Y9 V. {7 t: ^5 ]* Wsuper(type);
    3 q/ y% H; Q9 e& _4 H! Gthis.jobID = jobID;
    # N0 B; t/ n# _. N. a' m8 E} p
    - h# @* v$ ?& Z& p3 a% E, f2 Aublic String getJobId() {
    4 D: @' x8 U/ o  {( ]2 D% ^5 f* ^return jobID;
    4 J5 w( G/ F2 K+ j) z}& J% K+ t" P# D( U
    }
    : m/ o0 k  H$ y- ?8 `其中, Job事件类型定义如下:: d7 F) A5 {. O" N( {
    public enum JobEventType {
    + S* W9 z2 N6 gJOB_KILL,
    " d& B% P3 l7 L7 a3 e, b6 G' ~5 ]" iJOB_INIT,4 n/ b" q  z! y5 c
    JOB_START! U1 ~& D: N4 C" d# U; W
    }* q0 S  p4 [; o$ H5 L2 B) o8 G
    3) 事件调度器。
      w( {3 F2 ~' k) n* g接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:
    2 k- J& ?+ ~/ o@SuppressWarnings("unchecked")3 ^; ~, V! w7 |- s' O
    public class SimpleMRAppMaster extends CompositeService {2 S; p; p: U' |1 l* B1 u" {/ w+ H
    private Dispatcher dispatcher; //中央异步调度器
    # O) ^, H" R, y- {2 x/ I
    private String jobID;: \! p  v3 x  ^8 k6 l' S8 _
    private int taskNumber; //该作业包含的任务数目
    " V# T, g1 }* V% G
    private String[] taskIDs; //该作业内部包含的所有任务
    & T) G1 J/ ]8 S2 W/ i
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {" A3 n' }# d8 R
    super(name);
    : g# d$ V! C' i3 tthis.jobID = jobID;* _4 }) k7 e( d: e
    this.taskNumber = taskNumber;0 Q, y, M7 }+ e
    taskIDs = new String[taskNumber];1 }; ~8 G% Y' z% k+ U/ u
    for(int i = 0; i < taskNumber; i++) {* l3 D! Z/ z9 T2 n! h# D, n
    taskIDs = new String(jobID + "_task_" + i);9 m5 O) d& e# G' ?6 J+ z  d# D
    }9 {) l) {, z5 i6 Z' Z( w
    } p  P! A- t9 G; Y& o, X: H+ e3 y3 z$ Q# u
    ublic void serviceInit(final Configuration conf) throws Exception {
      x' }& e" l7 q! @dispatcher = new AsyncDispatcher();//定义一个中央异步调度器0 Z1 [( e! I# s; ]7 g
    //分别注册JobTask事件调度器
    1 `5 _* E7 F. F! P7 k$ ]
    dispatcher.register(JobEventType.class, new JobEventDispatcher());
      P& U3 s* ]' u0 |. Ddispatcher.register(TaskEventType.class, new TaskEventDispatcher());1 X( `% x9 P1 J2 {, H7 Y" Z" P+ a. t
    addService((Service) dispatcher);  V4 Z" p$ h% S1 ^; d$ M
    super.serviceInit(conf);
    $ {" k+ N7 P/ ]( Y7 i8 g# M} p
    0 V# x" M0 ?' t) B. }1 fublic Dispatcher getDispatcher() {
    4 |, T8 j! T! t! D. freturn dispatcher;3 V1 z9 K8 c* W9 X# l
    } p
    ( z1 x9 s7 T& Arivate class JobEventDispatcher implements EventHandler<JobEvent> {8 S0 w8 Y9 d$ L+ T" m& @
    @Override
    9 M6 d! ~" ~/ V+ zpublic void handle(JobEvent event) {
    0 [( X6 x4 W6 k8 kif(event.getType() == JobEventType.JOB_KILL) {3 u: I( ~- y" \
    System.out.println("Receive JOB_KILL event, killing all the tasks");
    8 O$ _% H* s% P; c# D8 F: p$ hfor(int i = 0; i < taskNumber; i++) {
    ! }% E, i3 O1 K, Rdispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
    # A0 o: T7 o& S: I  ^TaskEventType.T_KILL));
    4 o! g% ?( z/ [}
    ; Z% D4 w, A7 H6 `2 k" W3 c1 K} else if(event.getType() == JobEventType.JOB_INIT) {
    2 e* Z8 |+ F8 O& Y/ nSystem.out.println("Receive JOB_INIT event, scheduling tasks");
    ! n* _( q! u( L. a: Q  Jfor(int i = 0; i < taskNumber; i++) {
    4 `' y$ ~, R* X8 Idispatcher.getEventHandler().handle(new TaskEvent(taskIDs,4 H8 ?6 T) C% i6 q  _4 Q
    TaskEventType.T_SCHEDULE));
    . p$ M% c1 I1 O, _}
    8 b. F& |. A; O5 M6 [1 S/ {}; s, M' T+ h  ~, a
    }
    : e% @0 l6 e9 ^}p
    , _' x/ I  R# k& Q: D' grivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
    / ?5 b1 R( Z% Z@Override( P6 v8 h$ z* u6 Y4 K( |
    public void handle(TaskEvent event) {
    5 }& H$ s5 A* D, D: T( X% Y+ nif(event.getType() == TaskEventType.T_KILL) {
    ! n  P% @' A5 r# ^3 TSystem.out.println("Receive T_KILL event of task " + event.getTaskID());
    0 c$ L( e8 Q3 r} else if(event.getType() == TaskEventType.T_SCHEDULE) {' f/ t* P2 X! M, G3 d4 J( N
    System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());6 f: n! _; D2 ^1 S
    }
    2 Z, S; t3 w9 p}" m0 @$ k" @+ h- T( e: ?& e; j2 P
    }
    % i1 I7 `; p8 `+ ~! ~0 ^- }5 n0 k  E}" h% F7 ?) Z  n
    4) 测试程序。( O+ g! F' Y- ?# q0 ?( h& j3 ~
    @SuppressWarnings("unchecked")
    2 i# }& ^) p9 `( g% N5 u' Jpublic class SimpleMRAppMasterTest {2 ]& }) G; [% F6 r" G3 t3 S
    public static void main(String[] args) throws Exception {
    0 A( m+ \' o0 Q. P1 a7 V) C) uString jobID = "job_20131215_12";
    . j6 d! A3 I( g6 zSimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);/ L0 d3 d8 t5 S1 B% ?  `
    YarnConfiguration conf = new YarnConfiguration(new Configuration());
    2 G* s* m4 i  ?$ B. Y2 n: h8 CappMaster.serviceInit(conf);
    * i5 |, d! A7 f; IappMaster.serviceStart();" o! v/ W: m5 k! m$ P
    appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,  g# F' w9 g. g4 p' J/ E
    JobEventType.JOB_KILL));
    & m9 v. [9 ~- \0 n) C3 u* b- AappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    % T6 N7 _( B: W) ^2 m# y& tJobEventType.JOB_INIT));
      v# ?* {$ P: g5 ^/ U% P}
      [0 ~9 z; W( I; i3.4.4 事件驱动带来的变化# E. C; {( @* j/ N( X" H0 @) @
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
    5 P+ h2 o( t$ o4 z2 r8 s方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
    5 ^6 y; @0 y! t3 y字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    7 b" p4 k7 O. s. a0 `2 @是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一$ V0 `. I1 Y/ {; ]- G) z
    个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决; |; V3 `/ ?6 u# c- Y9 X
    问题之道, 必须引入新的编程模型。
    2 G; y$ u- e! j3 U& x! x5 m
    3-16 基于函数调用的工作流程" ~' ^# t: t9 V- I7 l  d
    基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则+ [3 y. J. i! C; b
    是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关5 a. \1 q* Q6 i( k9 N! X
    联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需0 Q8 R0 Q7 p' R! h" E% P
    向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理9 ]% S$ q" D- G2 x
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A& @' j9 s. u/ D0 [
    3-17 基于事件驱动的工作流程* _, v0 X$ Q& O: z  o8 q2 W: Q+ i  @
    相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
    2 C4 G, A, W; m4 u; l5 h* d5 }
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  1 j- e1 |4 w* L# _; b
    * e% l: ^4 s/ k0 [! K9 i" M
    7 ^. x* T2 V2 h0 f% U
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-4-20 06:42 , Processed in 0.105758 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表