java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3760|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66101

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库/ K3 I% ^' Y$ D6 Q& I( Z( b' V
    本节介绍服务库和事件库。
    3 z, b0 A8 f, D8 f! z' d% |: l
    3.4.1 服务库
    * [6 o3 A9 V. F- G7 V3 C/ J2 ?
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
    / ^6 d' @6 U3 P3 O+ X3 E: s
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、% {) u3 E. w. O" T
    STOPPED( 已停止) 。  u& ?$ H6 N! W, }3 I+ z2 e
    ❑任何服务状态变化都可以触发另外一些动作。3 s1 [3 w# n: b9 c6 v! i: `
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
    ' \: ^  G; {! n& c" k
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务2 c. G9 V5 P' u- q# [
    对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的5 {0 S5 d# Y) s, k& a& ?5 _
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于3 }" P! i) F7 ^! b; m1 y* [
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher
    0 _3 Y: s/ C' d, g
    ApplicationMasterService等。% M# T# }  M  J& w/ c( A$ y
    3-13 YARN中服务模型的类图
    ! g( X7 }6 d( j! Z0 V; Y
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服# x  W, O5 o" d  T* E+ P
    务的统一管理。" F# G9 z8 ]0 w: k& _$ [/ L
    3.4.2 事件库1 E% V6 f6 O" O1 s5 J) ~
    YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN
    * n% Y, f$ e4 B4 y2 n: v1 [/ ]5 _各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处
    0 }7 w/ q& O  G" a) E& `理模型可概括为图
    3-14所示。! Y  J! A0 ]8 [- e# ]; K2 U! z$ D
    3-14 YARN的事件处理模型
    6 K9 `% N% I0 X& t; R5 N* `整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器0 L* ^# C  D: u0 x
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其/ u6 }$ |( P( ]' b3 W" i
    处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成+ U/ _9 Y4 {* W8 L( L
    ( 达到终止条件) 。
    - j% Z5 \+ N7 P6 d; d- _; H
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager
    8 L/ X  o0 Z- `7 C5 [. q! e
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
    8 `& W, Z+ f0 `" E' t驱动服务的运行。6 }% y6 r2 h6 F# `  u0 f
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
      R0 y6 |  M/ w; {* m- {5 A9 P要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器, S- V, @+ s7 F
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
    , X; b7 o  e) W' B2 q2 F/ s- F
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由
      g( I  m: _& x& f中央异步调度器统一管理和调度。4 O/ h. `, w( {* f" K/ i
    服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
    4 Q: ?' z' Z  m- L8 I则采用事件联系起来, 系统设计简单且维护方便。

    ( [" L2 `# Q& n) Y, J3-15 事件与事件处理器4 o& J. c  g1 k/ g/ h  y, J( z
    3.4.3 YARN服务库和事件库的使用方法: g( {8 g' E* i7 t( J- Z  ], W
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce
    5 O. h5 {: d  a9 f  ]7 u2 {ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
    ' B, s% {; S& X1 `, [
    1) 定义Task事件。
    4 ?8 W, t, c* ?8 E( V
    public class TaskEvent extends AbstractEvent<TaskEventType> {
    , _' o: k. h: U9 n5 Lprivate String taskID; //Task ID
    % T! f. `/ Q7 |) d# U2 P7 Apublic TaskEvent(String taskID, TaskEventType type) {$ g( ]5 T& r5 Q2 l. l) c- M
    super(type);, L1 E0 @( \+ ]' g- j, e
    this.taskID = taskID;) g  f5 X1 ~" Z- c, f3 o, X6 ~
    } p
    * A9 V  u' m" O! o( nublic String getTaskID() {$ n" z2 W4 _9 j: @- Y1 h
    return taskID;$ |. ?( a% n. G- x) `4 E
    } ) @  D' _4 F0 S
    中,
    Task事件类型定义如下:* K; G3 g0 h0 s5 Q
    public enum TaskEventType {$ L. x( n6 K0 w9 s
    T_KILL,! |! O- L4 l& W
    T_SCHEDULE  a) a5 E; W" q8 d+ p7 j: `
    }
    ( p3 `8 W( Q( u6 o2) 定义Job事件。
    ! \0 [! [' ]) Q
    public class JobEvent extends AbstractEvent<JobEventType> {
    ( e! N' D& b' u: q8 P# M6 i; tprivate String jobID;7 r& q# @0 Q3 C* G! c
    public JobEvent(String jobID, JobEventType type) {
    ! E6 Z' H# \' b+ U3 m5 Vsuper(type);3 O$ P" ]( W4 G
    this.jobID = jobID;9 V9 \* z+ q2 X- o
    } p
    2 q1 A! o2 @. @7 [  \$ Bublic String getJobId() {
    , T1 U" {( S8 freturn jobID;
    ; E, L, B4 b5 ^% B* q4 J}
    ( O; u! D0 v0 g3 Q/ u}
    " h* y9 \( d5 U( f( O" w其中, Job事件类型定义如下:
    9 {8 j0 I$ a8 T2 K) O; z
    public enum JobEventType {
    / b# {4 D$ b, Q! C# Q; _JOB_KILL,
    & c2 s4 k) Y6 _/ ]! b4 ], cJOB_INIT,
    $ Y& T; y  h3 F9 W$ N9 y& PJOB_START6 A( S3 b7 u% O8 j6 C
    }7 N* [+ y3 B; `) s
    3) 事件调度器。
    ' S8 ]- v8 C) o- p3 D5 n- e接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:
    # ^( c8 G5 w+ X$ R, a% O, n@SuppressWarnings("unchecked")
    0 M# a% j0 a7 z$ J& Y6 Z* }public class SimpleMRAppMaster extends CompositeService {
    - Z& S& n9 `, E0 _  Q% w7 x& iprivate Dispatcher dispatcher; //中央异步调度器
    1 J1 v2 g+ u9 h, ~& _
    private String jobID;  V1 ~" q) u. K. w" h9 u5 f
    private int taskNumber; //该作业包含的任务数目2 e5 s. l1 S/ j  T8 Z' h
    private String[] taskIDs; //该作业内部包含的所有任务
    % C6 e1 @# O1 L$ b( B7 v
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
    7 ~6 y/ v- f  o9 Q/ m4 r5 lsuper(name);4 s9 Q- P) p6 u. a
    this.jobID = jobID;% m6 L4 y$ g& k( x- F
    this.taskNumber = taskNumber;
    0 B% z, B1 R6 FtaskIDs = new String[taskNumber];9 w3 \0 K% z: h
    for(int i = 0; i < taskNumber; i++) {
    " r% K) K  v/ otaskIDs = new String(jobID + "_task_" + i);4 I8 g/ g% ^" O4 u& U7 [
    }1 f  X- ?7 _0 j4 S2 @! [' [
    } p
      ^+ F' O, b4 ^# s% v- R  Eublic void serviceInit(final Configuration conf) throws Exception {7 \: O) ]8 ^% `2 V
    dispatcher = new AsyncDispatcher();//定义一个中央异步调度器" W* o7 s: P1 r- E* K8 r
    //分别注册JobTask事件调度器2 B- i3 @+ N! U) p, B1 K
    dispatcher.register(JobEventType.class, new JobEventDispatcher());1 v" Y( p1 S; x5 j
    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());* a" Y/ D+ v6 R, m
    addService((Service) dispatcher);- Z5 R3 D( j6 ?) w) j
    super.serviceInit(conf);2 i! H5 h9 J2 Y5 v& \
    } p
    ! W# m9 O' y/ y3 A5 Hublic Dispatcher getDispatcher() {
    + s( B! d- J7 r) f: }: Z2 u9 _return dispatcher;8 C: b! C+ C# ^, n3 t
    } p' [2 w" E$ I' S7 k& l
    rivate class JobEventDispatcher implements EventHandler<JobEvent> {
    - A, q; C, T3 b+ k@Override" m, h& X/ r+ a8 K: X9 K
    public void handle(JobEvent event) {0 J8 B# w9 ~7 `( m! q& \) |: F
    if(event.getType() == JobEventType.JOB_KILL) {  Q6 g( l$ {+ m8 E4 s# Q+ G
    System.out.println("Receive JOB_KILL event, killing all the tasks");7 O$ [! C# I. v7 G- _
    for(int i = 0; i < taskNumber; i++) {/ ^6 z' X# [+ @; {
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
    ; I$ c& a) N8 zTaskEventType.T_KILL));
    : r2 S6 [; U( J' X8 s}4 m2 c, N$ [' `/ j
    } else if(event.getType() == JobEventType.JOB_INIT) {
    % T3 g2 O  \% j6 [; _0 W/ a% eSystem.out.println("Receive JOB_INIT event, scheduling tasks");
    / R, F! J# s' ?3 wfor(int i = 0; i < taskNumber; i++) {
    3 m! m( u  _: J; W# Zdispatcher.getEventHandler().handle(new TaskEvent(taskIDs,% g8 }, K! C9 `
    TaskEventType.T_SCHEDULE));
    6 v3 q0 r" f/ h}1 b- Y9 A  d5 G) O, Y
    }: {7 ^+ `7 _! c
    }
    1 Q1 `5 C- b5 I' @: A6 o}p7 |  p- N) D. l( p! d
    rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {1 U. H/ n/ f6 r0 g9 c
    @Override
    1 `/ M7 s+ A) @/ G& a9 e& {# q# {2 \public void handle(TaskEvent event) {! U. y) _  x% A
    if(event.getType() == TaskEventType.T_KILL) {% X6 E/ j2 Y3 |1 f
    System.out.println("Receive T_KILL event of task " + event.getTaskID());5 i! g. v+ D& _
    } else if(event.getType() == TaskEventType.T_SCHEDULE) {
    - Y& B% B$ ~7 P4 J, VSystem.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
    9 |* f! K, h5 I7 b2 F  N$ C' y}
    " D$ e5 \* Z% Q  k) q  N% y}
    , R" X. P' }4 M$ o1 M3 P- o}0 \. c+ j1 S' g  G
    }
    : k0 P; E: |6 k* @% m5 b8 L4) 测试程序。7 E" z" d! W  @4 z! f5 f& g0 k$ o
    @SuppressWarnings("unchecked")+ E2 M( x9 K2 k2 I/ W) g, r
    public class SimpleMRAppMasterTest {
    % x4 g% {( d2 |6 Apublic static void main(String[] args) throws Exception {
    2 H5 ~( d* f0 S) ]- DString jobID = "job_20131215_12";
    ; w: k' ]1 ^. i8 @" R* K6 H5 `6 `SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
    * {5 w: v# @! |2 N4 x8 TYarnConfiguration conf = new YarnConfiguration(new Configuration());
    3 {* h/ \( U, h5 D- y$ M0 b) lappMaster.serviceInit(conf);
    ) B. C- _0 {3 G9 G& |appMaster.serviceStart();
    " J7 f: `, k. q8 W% X6 @/ UappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    * p; `" S, [3 z  \1 x/ c7 W6 nJobEventType.JOB_KILL));
    3 H2 l$ `9 ]6 \; i0 r5 h  ]3 I0 j# RappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    # P0 a1 \+ @) X' @) K  R$ SJobEventType.JOB_INIT));
    2 C4 ~/ u9 C* ?9 t! f! g" u8 K! Q}- _, A; F& h+ |2 k! [
    3.4.4 事件驱动带来的变化' \* p, V/ p; c# ?1 N
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的' J% ^- l( U8 J, R; m
    方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
    4 d+ W- j* @1 g# @: w字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    ( g' C( }( |+ S是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一- O5 k7 K+ S9 ^: }& |& V
    个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决8 w- R% j( [! H
    问题之道, 必须引入新的编程模型。
    7 H- i" N# ~' e; o0 Z
    3-16 基于函数调用的工作流程6 V# Q: h, o7 o+ ~' \# J
    基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则
    % N, C7 g* p9 R$ j" [是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关; k5 _+ v1 ^+ w/ ^. `
    联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需
    5 A5 {& N) p2 G( v3 W! |" K. l向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
    7 v/ I) \2 A- W6 k# O6 [
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A
    8 W0 I+ Y! P2 o# ?" l
    3-17 基于事件驱动的工作流程" j: t( E9 k9 ^+ p& {
    相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
    : Y, B6 e8 @- V$ F
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  ' X, U3 u! ?) l, R+ @/ L

    ! C% p  W. v  D  E7 V7 c: w0 T5 H+ p
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-4-29 23:18 , Processed in 0.114092 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表