java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2751|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66101

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解5 K; Y" _( [' D* V6 M
    Hadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。# G7 w5 ?* h# t9 h7 c
    1.ipc.RPC类分析9 F! [4 x4 z. |: Z* y/ z
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
    ) M( B- J; ]$ g9 G如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一6 y4 W8 I. j' S" A, R
    个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
    1 V, f  R3 I3 ^  I3 \设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
    7 T; \9 A* A5 C5 v2 w% M% q$ }5 R9 N& ?
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。
    1 c' }1 z: E& _0 F$ e% k2 k
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers
    9 g$ f6 g$ d$ c  C
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
    1 r! [1 b) r8 u* \调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。
    - j% f: V7 b. M& j下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
    7 d9 z+ ~3 o) t6 A2 l4 I0 B* [
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可" ?% l) ~+ T$ Z# y+ F0 m+ h7 C
    完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
    ; Z4 ~5 {8 B" ]- Q程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
    ) v4 V8 w; M9 O% T" F( ]$ S打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
    0 N1 a) z0 x* X' T9 W函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。# h- y8 f  U; e/ w* K5 y
    3-4 HadoopRPC的主要类关系图
    - G# A2 }! N& P* D' k  C5 a3-5 HadoopRPC中服务器端动态代理实现类图
    3 x) r9 b0 U; j
    2.ipc.Client
    % b8 j4 c" l6 ]$ z& X# l6 D8 n9 \
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执) i% A, }; N) ^# Q
    行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
      Q# R( p( N7 O$ T
    public Writable call(Writable param, ConnectionIdremoteId)
    5 A4 q% u; ]% {3 Sthrows InterruptedException, IOException;
    , |6 q. a! f: ^+ f2 N: l3-6 Client类图
    , L% p" h6 y/ I
    Client内部有两个重要的内部类, 分别是CallConnection
    2 M; U8 J. R, t0 z
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
    , {: d. b0 J  |9 e: L- m9 S错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺5 D% E  V: k$ U" J+ J, O: v
    序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id
    / X4 v+ }( R  i  v' i+ S
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。
    - G" [. H  Z+ d+ i3 }# N' h
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
    7 }& M, d+ \8 i3 u6 H4 V5 J信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流
    4 }) a3 |3 i% F* E* p
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    " I3 M- F/ O7 I% l- \. F
    ❍addCall—将一个Call对象添加到哈希表中;
    : m4 D3 c/ v/ S' }) v" V* ^
    ❍sendParam—向服务器端发送RPC请求;
    ) g. x# ^# u, I9 x9 k
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
    8 g4 I5 D1 l3 ~, K/ L  L% q
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
    . H; ^: Q; k7 m' R: M7 L当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。" _1 o5 `6 r% b. J; C
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
    7 Y+ d8 G5 ?  o2 ~2 A/ x
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;: g  z5 I, x3 m- L( c
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
    4 T) z6 V3 L; e4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。7 h3 k* ], y3 [# J: g' A, [
    3-7 Hadoop RPC Client处理流程4 Y4 e$ K5 D6 S, e+ g7 V$ C) Y
    3.ipc.Server类分析" N- X. X% Y! [. E% e9 d
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展
    5 l% t: q5 L4 q- F8 i性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计( N5 A- ]2 j; C6 D' j
    目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
    , d  T9 O% ?+ r* c
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。
    % {; Q& l5 s4 C9 L" @
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性
    , \' T3 ?1 ^7 |. D; n! d' p/ b能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。7 v4 z, t( u9 X2 P& }9 p6 o
    3-8 Reactor模式工作原理/ a- M. [$ w% Q: P) U- l7 K
    典型的
    Reactor模式中主要包括以下几个角色。
    - }7 z% \) h0 m❑ReactorI/O事件的派发者。
    ; g6 s9 u1 {3 j- g5 [7 x
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler
    " t; g" m; M7 }* {2 A  E7 `
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽6 [8 K! }8 w+ s6 x1 Y, N9 V
    象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
    4 a4 M+ e3 W& ?当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断' L8 ~7 L) d# P2 j! ^- u
    的处理。2 ?5 @0 R5 |$ i3 \2 f+ m$ }: g$ B/ a
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线6 }7 @. c) w" E6 ]9 h' |
    程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对( P& A  Z2 O) w# S* q7 E; @
    应的
    ReaderSender线程处理。
    1 x8 d* ]  K  X- q7 I- _
    ip( l- p; M0 f- _. s- z- x" M
    c.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
    5 W, c- d4 w8 q地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。% M0 @3 L& C- b: o  N) i4 A
    前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为( f" Q2 p0 x% e9 V
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。. o( L* ?0 i! ^" {* i
    3-9 Hadoop RPC Server处理流程1 r" [" }! P. B' j% P8 P8 v
    1) 接收请求
    ! ^# F5 b8 k9 t6 O该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue1 f' m# W4 i; ^6 ?. Q8 O
    中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。9 Y1 O, d. \$ B
    整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
    2 |# y* Q7 X) ~0 b. [7 e池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个+ h# C- I  w6 o& f
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。* r( W1 K8 v% f% C8 r7 Q
    ListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。- U) ]& z* N4 }& A' D3 u( e
    对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
    6 J5 |( w  w9 {+ d6 W
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call4 W: C4 L7 R. b: l. w5 d6 j0 P
    象, 放到共享队列
    callQueue中。1 N$ ^, m" P# M
    2) 处理请求  |' S! l; s* y4 s+ i0 w  U
    该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线. N( ?; A" i: \* i7 C  B. Y
    程完成。" V4 M4 ~$ X9 K1 t7 d; Z* w& A
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果4 T  t/ j6 e9 P9 ~) B+ S
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时+ ]3 j6 _$ [' @8 t
    Handler将尝试着将后续发送任务交给Responder线程。" u! H$ u" a3 ?0 |) z+ r
    3) 返回结果2 `8 H; ?; t  U; j' d4 h
    前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结4 B4 f7 F; e+ d8 P0 z; Q$ d
    果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。; Y/ m- }$ M2 j$ F5 F
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将) W6 n; D* n1 M; H
    结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送% a5 G7 X# @8 E8 k3 {
    未发送完成的结果。0 v) x% `" z3 i
    3.3.6 Hadoop RPC参数调优: w+ F7 D: m; x7 [2 p" P
    Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。" d' ?7 [, F, D8 ]( {3 b
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
    4 D: F2 x$ N, h. h3 n5 {+ _8 {+ ]
    Reader线程。
    6 Q$ c" B  W6 @
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
    3 N/ y+ R) c! i
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:0 ^+ m3 L: K" ?- l
    100×10=1000
    8 C5 F1 k( @5 Y1 C- a" v8 d' N9 X
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的) D9 j8 s( {! r; D3 R* J
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    + @7 _0 S6 u: \& u! s
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。
    4 n5 S* }- A. I# L" ]
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可" j/ f( K& A, a7 f$ {9 y
    能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
    ! I9 j1 m5 K/ R3 ]  d4 ]( B
    10次( 每两次之间相隔1秒) 。- |: l' q3 Q, X" l5 [: n  s$ p
    3.3.7 YARN RPC实现6 x; X. f, |4 p; m% V) o& I6 |
    当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组
    ' X! K' I5 J' ]$ A" t, n) \成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
    * Q, M9 X5 {2 {( {2 g8 l! m) 。 相比于Hadoop RPC, 它们有以下几个特点:% W/ Q8 x) x9 ^" J+ D4 ?. `
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用
    " H3 a& i: `: G0 _
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户% M# N3 [; w( v7 j9 O
    端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。# b. E# M* [, V7 M9 B
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,9 Q9 y4 E1 O5 M4 a6 l5 U4 g
    并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应
    ! h7 Q! v" v& R9 O* ]" ?' D用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。# l9 l3 w3 r* `3 \! r+ N
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者. G8 f) W2 U: {  o+ N1 ~9 Q
    删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。( @+ `3 D. h/ G  y
    随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:& e: s7 Z" y* X
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言' y8 t1 e# s) }* {) i
    读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。% F, t0 g. M9 M9 U1 l* l) F: Q% g
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如* x' k  C% \5 E% z
    果用户企图这样做, 会抛出
    VersionMismatch异常。5 ]- b: S7 ^- X7 X. U5 U
    为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
    4 H6 ?* L2 L! }0 B9 D* ^后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
    $ F6 x4 y4 u9 {3 w4 {- [
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的$ O; X( U0 ?6 U; C) i
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的. l: |( j+ @: `; ]$ v+ l
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
    " `, J% a, Z0 G中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。+ y+ j6 i) P4 x6 Q+ P: A  _$ j4 b
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信; s* c6 [# I6 T& o7 v$ h3 T
    协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是8 ?& B# x6 v7 k- h! X4 G% A' j$ K
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider1 ^# u; V9 r; L: D5 x5 m# Q8 m0 B  I# A
    成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
    % z: |- B6 G+ K% Z  T$ |务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
    ( g. F# ]% O* A" k$ u. x据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
    ' |/ i# V! {+ t! y4 F, I3-10 Hadoop RPC 集成多种开源RPC 框架
      A5 s: M( P4 l) Y0 {( F2 N3 ?
    3-11 YarnRPC 相关类图
    - q. {! L4 M; e) s
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
    ) a0 d+ [/ t5 x: Q# H" y它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于4 w7 [( I4 _1 r: ?$ d% M
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前4 j% W" t1 M1 W$ Y
    "PBClientImpl") 。5 ]$ }9 {: M+ V+ n9 Y4 _" y
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
    : I2 _( _# e$ ?& U# E0 i(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java0 B* d* O! f0 k/ {  Q4 c
    名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现+ y% `% V, A$ o9 H3 a
    类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。( B0 L0 c. ?) m! x7 x/ o+ m
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以: [4 ^1 M* A: c
    下几个方面:. g. \* h, I; b/ N( G! c
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允" I: ~% H( M5 S7 [; u! ]( {
    许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
    # @, y% e) V/ f6 y5 |户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能
    , B0 g" i. P, \. g  ^方面有很大提升。
    ) L4 y. ^8 _% I% _; X( a
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,
    - D! `' o8 Y2 f" O其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol  I! }& A# v2 |. ]8 J
    Buffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
    0 L. ^! a# M+ a# L
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。
    + v) |. U- D7 L% n3 W
    3.3.8 YARN RPC应用实例
    6 V1 B; H4 Y; ]为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。  K* z+ f/ w! Y, p5 L7 |
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户: u8 i% O3 Z$ N( u; [$ i% p
    端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向& {; u) Z. ^6 O& N3 ~1 P! K+ H  z; @$ w
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:1 J% z9 N/ h( A
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
    2 L, ]# o$ E7 |7 V; _" a, Hpublic class ResourceTrackerService extends AbstractService implements
    4 y6 a- R3 _; N" vResourceTracker {; B+ w* {1 L+ a$ u6 b
    private Server server;7 h) p3 u3 y- T0 p
    ...8 [' L- o- P- U, k& f: ]
    protected void serviceStart() throws Exception {7 K9 c0 V/ u/ x
    super.serviceStart();( p% e9 u: {& f5 }6 V
    Configuration conf = getConfig();
    5 h  `) H3 w; eYarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC' p/ E+ I, Z& w' Q
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,- v. o* w. c1 E' ^
    conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
    $ f' \% O# [3 U: AYarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
    ! W" i/ m5 g, \7 ^; a$ Qthis.server.start();" C2 X$ h8 O$ I) F" X+ n
    }.) B& t  k8 P/ M3 `
    ..
    2 m! {% ~2 s: L@Override0 n2 o) W5 J" R7 `
    public RegisterNodeManagerResponse registerNodeManager(
    ) m) t* N- L* C4 |/ DRegisterNodeManagerRequest request) throws YarnException,
    6 }1 {3 J; ~' s- w  m6 iIOException {" n; _6 W$ J9 C: ^) \0 h. h
    //具体实现
    ! y) ]; m/ V$ R4 ^0 a
    }@
    2 ]3 T* Y' Q% Z0 cOverride
    8 {; `2 p* y# t, _, Z0 P5 Epublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)' q8 W' t, p" Z
    throws YarnException, IOException {6 f2 h# y+ l( c, u7 C
    //具体实现
    * Z. I0 ~/ ], ^$ k  z# P8 O7 [) @
    }
    ( p) o1 r% Q$ J" Q! I$ u}
    5 Z' z- u' f9 f1 K& N1 NNodeManager(客户端) 中的相关代码如下。4 }9 t& |! l! e; _- u; Q9 l2 a$ ?
    // 该函数是从YARN源代码中简单修改而来的
    2 W9 @" \5 w' f
    protected ResourceTracker getRMClient() throws IOException {  d6 @3 u* X  p* e4 G( b( D2 W
    Configuration conf = getConfig();
    & K' X# @( Q2 Z3 ?1 WInetSocketAddress rmAddress = getRMAddress(conf, protocol);
    8 L" p+ D/ G3 E$ `( ]" {RetryPolicy retryPolicy = createRetryPolicy(conf);
    * Y( ~# a* n5 k) ^ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
    : v9 j# Y% N* [) T  nLOG.info("Connecting to ResourceManager at " + rmAddress);
    3 B2 r4 q7 w: ?0 A% O& T1 L# N) preturn (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);9 P( u$ p; D: V/ Y$ y( ^7 ?
    }.2 c3 c0 S0 o' H/ s5 }
    ..
    & H6 M- H) g. w6 e5 qthis.resourceTracker = getRMClient();# o/ [4 F0 |% a& O/ P$ D
    ...! I; ?1 q/ [4 j$ T. v% J
    RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);$ |$ ^2 n4 ^0 p0 h( _; d% F- ~
    ...
    ; T& |$ J+ V+ K2 C7 ?2 gresponse = resourceTracker.nodeHeartbeat(request);9 M+ U/ z! u) G* i4 F7 `  x
    为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。; n# K0 R: `, _% p' s1 @+ |
    步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat! y, Y; K" H$ O& `% n1 U
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:0 E. O: {. k; J
    public interface ResourceTracker {
    # {0 N6 ~% R' k4 `7 Ipublic RegisterNodeManagerResponse registerNodeManager(
    6 P* F8 p" d) T: DRegisterNodeManagerRequest request) throws YarnException, IOException;
    1 p; l3 B: c# e" W8 L# kpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)+ v) T" E: s2 r" o- I; v3 q
    throws YarnException, IOException;- r0 H  v$ M! E' }. x- q2 H% g
    }. X' P6 G2 W9 R1 u9 H2 x) R! R7 t2 E9 z# j
    步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
    + W2 ?+ v5 Z/ V1 p未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的/ l( _0 |2 m& U5 q6 ~# ~  z% H
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:  ~8 S1 v7 b1 _3 ^
    option java_package = "org.apache.hadoop.yarn.proto";
    6 {$ y  J) W5 z6 k) {* K# o$ Poption java_outer_classname = "ResourceTracker";
    $ o* ?+ h( g. {0 z( }option java_generic_services = true;6 `0 }; _0 A6 f" Y
    option java_generate_equals_and_hash = true;. }2 V! a# E+ ?8 l5 P1 `" B0 x4 |
    import "yarn_server_common_service_protos.proto";8 d6 m& G* r2 @) [+ J
    service ResourceTrackerService {8 q7 c- J# x% [* \/ f
    rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
    4 w0 l, e" M# ], [1 {8 [5 g: D* Irpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);. q' X0 g- q, S6 M2 {- Q& v
    }+ L0 f: l6 ?# }+ R& R+ p) g& i8 G
    ResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。! V1 v! ]: f5 e
    步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
    3 ~2 i, V- q' ^. i" xBuffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest4 z& B$ a$ j: A1 y4 _
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto
    * A. v* q: A7 }5 i4 o9 l5 ~件) :
    . _6 D! Z7 J! {$ u4 x6 [
    import "yarn_protos.proto";+ f  S% i6 Z, B
    import "yarn_server_common_protos.proto";4 |4 i  ]* y5 D* ]1 |: g4 A
    message RegisterNodeManagerRequestProto {
    0 q# p# k8 h# `+ D" d5 U7 Roptional NodeIdProto node_id = 1;" G0 p9 W# S1 I& l9 x4 ?
    optional int32 http_port = 3;6 e; D& K" ?' F/ V: M( ?
    optional ResourceProto resource = 4;
    * m- d- c% z* I9 Y; v* V- J2 \! V, F} m
    7 U* v( J$ U! Kessage RegisterNodeManagerResponseProto {' V$ v. z7 ?! O4 ~& v
    optional MasterKeyProto container_token_master_key = 1;
    , {! v- R2 Y2 t' W/ Z2 i( w) doptional MasterKeyProto nm_token_master_key = 2;5 ^6 _% a. b5 l+ Z# f
    optional NodeActionProto nodeAction = 3;, t' O0 @# P) M" S7 _) v& Q2 U
    optional int64 rm_identifier = 4;' [2 Z& E3 Z+ o
    optional string diagnostics_message = 5;
    0 M* W) g0 H# M. S6 ^}./ k# Z" r- M$ d6 G
    .. //其他几个参数和返回值的定义
    ( d3 S9 K1 [" F( g
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
    9 H# @% m8 M( f: ?+ q( P6 P原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol
      T  y$ W0 v& {2 n/ C% qBuffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数+ e) |. s* M6 t9 C3 |( O, r
    RegisterNodeManagerRequest为例进行说明。
    6 u: P) c; z3 s
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :7 |$ i9 s- n' S0 Q$ S
    public interface RegisterNodeManagerRequest {
    % t- B3 \* C; E% Y; ~; J9 [NodeId getNodeId();% _1 g# B) q# N
    int getHttpPort();; _1 L) M& f/ {2 |0 L/ o9 Y
    Resource getResource();
    7 j  Y# T6 K/ m  Lvoid setNodeId(NodeId nodeId);: T7 b8 c& G2 b% ?- r1 M
    void setHttpPort(int port);
    0 o* P2 W- m7 a# K& @: w/ Q% T6 ^void setResource(Resource resource);
    4 I% E$ m' u0 a7 F6 I}+ {" G. L" r" P* N. v4 W
    Java封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :
    : D+ o% J$ ?$ ~' r6 J' w  G# x
    public class RegisterNodeManagerRequestPBImpl extends# M* ?& m( {+ d3 A  S$ G
    ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {7 Q' W* s# m. Y' i7 _! {7 W
    RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();+ ?% r/ G; m/ P
    RegisterNodeManagerRequestProto.Builder builder = null;
    - S- {4 G) j8 x. q/ }4 G' jprivate NodeId nodeId = null;% S9 H. K* g1 X- X
    ...
    - _2 j. o3 L$ V  g3 Z@Override, {, A+ M, h0 W/ }' v4 c
    public NodeId getNodeId() {
    8 h8 ]) K1 X* M1 M& bRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
    4 D% p, K9 N9 Z& Iif (this.nodeId != null) {
      M$ f( b8 j, j9 M" oreturn this.nodeId;9 y  ?* f( ?/ l( z8 U( I
    }i
    8 Z8 L- i' e+ G# U, R9 b5 D. }# rf (!p.hasNodeId()) {
    % v) G. R2 I# L, e9 ^  dreturn null;
    1 B1 h! a( K- Y  ~2 A! z( a& r} t
    . _4 x+ z) \. t, r" P# S; Dhis.nodeId = convertFromProtoFormat(p.getNodeId());: W& m. [& @" F
    return this.nodeId;- V! m3 w: X, S% Z% ?  W
    } @4 R" E. L7 D- K- u+ c- M8 m9 e0 R
    Override% i% \7 Z4 B2 f$ s: h. P, Z: P/ P
    public void setNodeId(NodeId nodeId) {
    ; }/ E( h" m! ~$ |6 z  O; tmaybeInitBuilder();
    ; z$ r8 N  L3 V* B2 Y& Dif (nodeId == null)
    , ^4 M, R" _1 n5 b& ?builder.clearNodeId();5 N( I  Q0 Q9 ?! R7 E) }( \
    this.nodeId = nodeId;3 [" Q5 z8 c& w8 Z1 @( b* ?/ Q1 y  l) x
    } .- A4 g1 x. N  a( Y: P5 y
    ..
    8 g  r* _5 k& w( o" a( ]}
    8 c/ e5 @* ]1 \步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为
    2 n/ _- B5 H- T
    ResourceTrackerPBClientImpl, 实现如下:2 U: G7 p- Q. T0 E* R7 m* t
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
    ( F( n1 R/ o" t$ Sprivate ResourceTrackerPB proxy;
      S/ u5 d! ^+ h, r9 Mpublic ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {% B2 J/ P9 {3 q
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
    & i: C1 N8 p% `1 V% Mproxy = (ResourceTrackerPB)RPC.getProxy(
    9 O4 h! Y3 a1 W0 c. SResourceTrackerPB.class, clientVersion, addr, conf);
    $ U1 b# v0 F1 C( q9 q} @# c5 `! W) B& @7 A* u+ D
    Override: X, [) [2 Y! Y3 h, Q
    public RegisterNodeManagerResponse registerNodeManager(
    ( @: C% f! T, M; y) {, D! bRegisterNodeManagerRequest request) throws YarnException,# b1 W# f$ v9 U
    IOException {
    ! M9 D! F. T( s3 _8 V  ^RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();. q1 m. d* h. J* N: P6 L
    try {/ f. _3 D: `" `4 r( C3 Y
    return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));8 p: O/ [$ ^; m# E, P
    } catch (ServiceException e) {$ H! G1 o; S$ B
    RPCUtil.unwrapAndThrowException(e);3 {- c. y* d4 m" m5 Y* {
    return null;' G8 }* S# C" X! P+ o; k, @
    }
    " G" u6 h# D! y1 F3 {. w7 o} .6 \; O: X- A% W- \1 s/ q- ^
    ..* _1 i# f& T. o$ R* r& F6 Z
    }
    6 Q+ C7 [! |$ S# g8 v9 ?5 J服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:1 p: \& l! D; Y2 L
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
    * m4 u! D# L; U. {. M! Z; e+ ]private ResourceTracker real;+ V. c  D( U2 h4 b
    public ResourceTrackerPBServiceImpl(ResourceTracker impl) {- W& F+ i% o. j7 d
    this.real = impl;
      Q- D1 S0 F& B* I} @+ S; p2 v  D4 A4 n2 T, g
    Override
    6 B  T& n% p7 G) apublic RegisterNodeManagerResponseProto registerNodeManager(
    ; s3 Y/ Z3 X! V0 NRpcController controller, RegisterNodeManagerRequestProto proto)- m% K0 |, [; u, q: k1 G# |4 I
    throws ServiceException {
    ( w3 H: G5 a3 [  |- D8 I& n! f6 gRegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);" M' ~3 v4 U+ ^
    try {
    1 @6 r' U' \7 d6 F3 q0 u" mRegisterNodeManagerResponse response = real.registerNodeManager(request);
    ) x" F6 Z6 ^2 q0 Y" @- \# D8 J* Greturn ((RegisterNodeManagerResponsePBImpl)response).getProto();
    / Q! b' ~! f2 ^+ ]} catch (YarnException e) {# X$ |5 D( W; _
    throw new ServiceException(e);4 p0 e5 R8 H6 U
    } catch (IOException e) {
    5 l* j- s, y0 p" C8 \5 E5 i5 ?throw new ServiceException(e);
    2 |+ w3 V6 K6 E0 @0 F}, e/ T2 _9 o( G" b" ?7 e) u
    } .& C0 V3 q  ~, C2 I( g9 `# c
    ..4 Q( A( H3 E. |* z5 g. b. F, Q" N
    }- L5 Q9 K  p, g: G
    总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列) j' d6 Z. j6 o7 p
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。5 S: Z3 Z% N' M1 M4 F+ u6 c
    3-12 YARN RPC中的Protocol Buffers封装
    6 v4 t& g& q! k. f( J" z0 k
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call# r* m+ x0 C# s
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。1 T+ C. ~* ^9 D6 T8 k+ }
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。' e4 ^; v- J9 D+ ^: L  P
    [9] 参见网址http://thrift.apache.org/
    1 C, E# ^$ E4 K4 u1 z  K" V/ }
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns$ u% w5 ~2 Z+ x* A' `
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。$ k- l( {) W* p) N. J
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。! P# Q- m, S0 G5 y6 K2 k! i  q
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-73470 f4 w' n2 B3 i7 C  R2 U; B; e
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则9 O0 F# B9 s; \; V1 w4 p3 X4 }
    使其向前迈进了一步。
      ( U1 o( v) S* y+ I, \3 l! }. t% s

    # s' E6 q' W& }, V
    3 g1 c: P4 g% H, Z) Q& F
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-4-30 00:07 , Processed in 0.107747 second(s), 27 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表