java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2746|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66093

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解9 O; G" {6 y6 W6 V5 H2 H  \* }1 r
    Hadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。. [  k8 x5 x/ u, u$ |+ m
    1.ipc.RPC类分析
    2 e0 E7 S) m3 }8 V+ G
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
    . h# |5 ]) Q8 ?; ]! {. c8 H8 N, o如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一
    - _8 l1 i) F- u; X* r2 R& a个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户  S& u+ A1 Y  z3 l  B/ d
    设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
    * _3 F" l3 K# H  y$ M* v
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。  i/ k6 U9 \5 x. l- c2 c! o( G* y1 f
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers) |( Z% O5 c/ ]( H, k
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
    $ s1 n) ~8 {( j6 ]/ L调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。' g# H) h/ D& \5 N
    下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用$ h- N5 O( Z5 ], c& T
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
    . l  z* M6 ]3 ~4 y2 d完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
    ; Q: d1 k+ S; A7 J# a程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
    / o9 F9 S3 ?' h1 u9 ^4 [打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,& ]+ Q! @- ~! f' q$ c: h  E3 b
    函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
    " g: [, Z. @5 ?7 [& p# K
    3-4 HadoopRPC的主要类关系图  t9 S3 |6 _+ o9 u9 B4 G+ H' z# E
    3-5 HadoopRPC中服务器端动态代理实现类图* ?8 x! e7 j5 b& f; @3 t+ ]) ]2 J
    2.ipc.Client
    4 v: ?# L9 a3 q
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执8 `% g! c0 ?$ ]3 |/ T8 D
    行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
    4 Z8 }; K. ?" |( i" _. m# c
    public Writable call(Writable param, ConnectionIdremoteId)2 d  N. Z4 d% z6 }. z0 I8 x% w
    throws InterruptedException, IOException;, a2 Q8 ^# d% u" E3 g. p# S
    3-6 Client类图
    3 C: }# Y+ Q- a" J8 m' X" J+ N) E9 @
    Client内部有两个重要的内部类, 分别是CallConnection, \  X- ]2 q0 @4 t+ o" [
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出% Q* F5 i- w# S  E4 r
    错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
    & f( G# ]7 j& i7 `% U# T7 ^序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id9 l; x& M6 o$ y) u+ a
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。
    # s9 N7 [3 e9 K! ]  f8 u) H4 l# Y9 }
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本) a- U" S" j+ F8 s0 u% d
    信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流
    " ~' P9 s$ |, f5 i
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    0 i" K& [( o) D1 j# e, n6 i
    ❍addCall—将一个Call对象添加到哈希表中;
    9 k7 Q* J% d# A$ \, r: |2 U
    ❍sendParam—向服务器端发送RPC请求;
    0 ~( a0 J) U" c
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
    4 M: r5 Q* ~* o+ X8 J
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。5 p- w) }2 S" p8 }
    当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
    ( V7 z$ X5 Z3 Y" c; K2 {
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
    + _& f3 V$ X6 H5 ~" m+ C2 \$ [
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
    3 t" f0 v1 c! J: o
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;0 s' b# ^2 m/ t$ ]8 S, \8 F
    4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。  O6 }# g* m$ L0 G
    3-7 Hadoop RPC Client处理流程
    ( j, P' d- L/ l1 ?1 C5 }! y; N
    3.ipc.Server类分析: z9 {+ a( Q: g9 a, C( y: O
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展3 @7 T5 l6 ?) ~2 {% P
    性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计" l. F5 k/ X; E
    目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用& ^* }3 v1 S' s* n( q
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。# k! D% Y2 G# N  h: y8 u! m. y' L
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性; t4 H. P" k. z0 T/ E8 |
    能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。- v' `# B' W9 w; G: ?3 e* P' B
    3-8 Reactor模式工作原理
      q! S: s0 R8 @+ ?- Y典型的
    Reactor模式中主要包括以下几个角色。
    ' V9 D, P7 O6 i4 c! z/ W( B- o0 Z❑ReactorI/O事件的派发者。/ ~+ [  c7 L  Y- U
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler
    - B/ }, U' J' v, t9 l6 i
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽: W8 `$ {: `3 d/ O2 G
    象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
    9 [8 e. w2 g. Y9 L' Z) I+ t6 d& @当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断
    + _' A* R. ~) }- h8 o1 F4 ?3 m7 W! L的处理。
    4 e7 t) e% L1 t, W8 ~5 T. D
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线
    - I/ q7 r0 P3 A2 o8 a, x程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
    3 c* c3 h% z2 L) s& _- {- p应的
    ReaderSender线程处理。
    ( u( b4 I+ z$ b3 Q$ w  K
    ip
    + X, q2 J) u5 w! M# d: uc.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
    " |2 ?7 B8 y) J; |3 `: L地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。& m; p" \8 \0 \8 V2 i* l9 Q
    前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为8 ]3 \. l( x6 h# c, j
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。/ j1 K9 H; B1 L/ f$ s
    3-9 Hadoop RPC Server处理流程
    ' `6 B% R! ]4 q  n. N2 {
    1) 接收请求
    ! a0 I+ y  |- J/ q, q: r  r: ^该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue4 x% b+ }8 N% D! o
    中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。
    $ |5 c7 M$ K+ D. r: A1 L5 U4 A整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程6 x* T. p9 o; I6 T8 f* c8 h/ B( ~
    池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个) G1 v0 }. h; v1 E& ]
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。
    ; J- V+ D, j( }# O. z! JListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。5 J4 m# B( D; D7 t& K& i, z
    对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
    " n5 M  e, W" V% N; ~4 }' P
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call9 e5 v/ K- c- d% p0 M2 N! k, ^
    象, 放到共享队列
    callQueue中。
    ! f3 G: D( H* c! T! ?
    2) 处理请求
    6 n4 V4 h' ~6 G3 h5 e( |+ T+ l' \: G1 f该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
      K! a$ `$ }8 `1 k4 `. A程完成。! @. v% B7 d% o1 ~
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果; d9 E5 _2 N" B0 u
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时" }; d2 |+ p) N) d2 b- j
    Handler将尝试着将后续发送任务交给Responder线程。0 a! I! `* W- U0 W
    3) 返回结果
    $ e/ ]# L: v% C/ q# D# L前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结) z/ |& h+ I! S  Y: O$ O8 V
    果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。5 \9 o4 {  b- ~4 o, v1 T
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
    - c% i  g. k9 T; H结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送$ w) w/ k9 S$ m& a( P& O  r
    未发送完成的结果。
    " `, ^: m" Z5 G9 x  m% Z8 S
    3.3.6 Hadoop RPC参数调优: Y5 \9 i# y, H/ S2 h- G
    Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
    % f7 |! t& Z( c: \
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
    5 m6 ]8 R+ \+ j" b4 c
    Reader线程。
    ! j+ P( x# i- `; c
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
      x1 r3 V8 `; Q3 R' c' t6 {$ v
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
    5 i. \  _. I( ^, ^
    100×10=1000! q2 m2 C) l6 U8 q; p+ c; E, F
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的9 F: ~9 v/ M+ a" [$ A5 C
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    ! R' g4 u3 m. |
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。7 R/ U* b7 z' x7 s9 m5 c8 s' A# o3 j
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
    ( t# S: g/ Q5 v- n+ P$ G% v能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
    5 A6 ?9 a7 F" h1 N# P" r1 C
    10次( 每两次之间相隔1秒) 。
    ! R* u4 z: J4 M& W. ~/ k$ @
    3.3.7 YARN RPC实现! H/ w6 U. E( P$ Q$ r
    当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组
    ( g8 P1 l4 M. Z成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]9 C/ k  g! {: V# o/ b
    ) 。 相比于Hadoop RPC, 它们有以下几个特点:6 h- c0 @* Z# `5 S0 Z* U: B7 x
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用
    % j! i) A; ^  i' o7 Q
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户$ Z0 s1 I7 F  Z' L- o- p3 Q" b
    端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。5 J8 Y- f1 Z- p
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,
    7 e8 R0 R; N$ ?0 ]6 W+ S并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应  w5 c0 ]1 a$ J3 `: P
    用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。
    : [, f8 P1 R( Q* T
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者2 S2 G$ q2 L7 s) N6 O. J
    删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。. ~9 m' k  z  |) S
    随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
    2 p; Z/ A* g* V5 T9 q
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言! j- u; l1 e4 e& n. [
    读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。& ^3 b9 c6 w$ |/ ?- h% d  T
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    / q- H- o" @4 F5 L9 x. V果用户企图这样做, 会抛出
    VersionMismatch异常。
    * X+ D9 ^: R$ z. A2 F) Z为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
    $ I* Q' r7 s8 a1 _后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源0 M8 `: C- Y2 P! `" d
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的2 n- @( v: z! b7 A
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的4 d( D9 m2 b  k  J6 g; n( i
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
    ! Z; B7 ~6 O1 x% K! @* d; ~2 c中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
    2 C& M* U5 Y! H; d' V2 `
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信. h* t+ I. E! d4 x
    协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
    / c! o" i8 O5 h4 A6 r( K( r0 I
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider( W, `: d, a% R: P) F  `
    成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
    " k- \" Y# _8 E) i' [9 N8 p6 p2 y务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
    4 R: B2 f' F! _( a9 m& j' I据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
    * p. I. ?8 N- A( L3 v1 W  z' l8 W3-10 Hadoop RPC 集成多种开源RPC 框架5 F2 B( P  y3 P% M2 y
    3-11 YarnRPC 相关类图/ a" l" X" o" M; E  d
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
    * H/ J" c/ [0 E: e3 g它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于* K$ y$ ^6 J; W. N
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
    , i: p9 n; R% D, o
    "PBClientImpl") 。
      A7 {# c9 }: D% W' @* j& t
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
    6 s0 c# |% y) }: d. d6 z& H(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java. y/ r: U+ ^1 z  Q/ p/ x/ M" c
    名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
    & K5 I2 F  R& I/ V: |类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。
    * H% h. Y9 ]  v
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
    ' q4 X7 t2 e; }/ b* s. w下几个方面:
    ; o2 w, k1 v0 h3 v8 u; M; k
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允
    $ q! i- N) ?- t' ?许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
    * L7 I2 [1 f0 K/ Y. M. N户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能
    1 a2 f2 A2 R: l8 [/ {, P方面有很大提升。6 l6 v7 `1 f9 Y" ^* ~# ^  N$ ]
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,$ r/ [( `8 Y5 M  z
    其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol2 K" s; `+ G, w% R
    Buffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备0 [  v+ ]3 G! F4 t" E8 [
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。1 I# }& o9 S; w+ K( D$ _
    3.3.8 YARN RPC应用实例8 c. ]/ z! B" D3 l
    为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。. d# [3 ?/ d( s1 {7 W! _
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
    - n3 T  L7 T. s8 L7 Q端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向+ a: a. S! P9 A4 u+ j$ h
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
    ; T% I/ j3 v5 r6 A
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server- v1 P3 g0 W) c3 Z) ]
    public class ResourceTrackerService extends AbstractService implements
    7 @4 Q! }# `# m8 T) V& h) l. }ResourceTracker {
    3 i6 x9 I2 M% L" Lprivate Server server;
      n% A+ D/ ?7 r0 A9 }...
    8 F5 I/ e- x( I& h8 n0 Z, E1 Vprotected void serviceStart() throws Exception {9 T% p5 t% ~) T3 z2 O" ]
    super.serviceStart();
    ! I1 R, Y8 Q3 V5 h& @Configuration conf = getConfig();* ^9 r; \2 A/ V+ ~9 U: L( H
    YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC
    # j; u/ |( K% ~' {
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
    . M3 U" s+ o* ~$ _8 Y1 ?% Z& O1 Yconf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,' j* E4 z9 J0 W7 a/ D' r, Z5 m
    YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
    " F  M9 L/ a* O1 k3 K9 x/ w, Nthis.server.start();
    5 |" f8 Y( t+ j}.
    1 @! m3 E1 w7 T+ N! d, Y..
    4 \, t  u# v/ E8 I/ }1 {@Override2 K) w" x4 H5 M- ]
    public RegisterNodeManagerResponse registerNodeManager(- D3 W. F% v/ ^3 C6 @  c+ B1 R
    RegisterNodeManagerRequest request) throws YarnException,3 J. d& i7 ~) u1 x; b+ S* O+ z7 k3 ]
    IOException {) I3 P, c( b$ G. ?5 ?# B0 z: f. i/ s
    //具体实现
    ; ^7 M( G$ ?1 J6 W' W
    }@1 K4 G; [  n& r, o7 F) T8 I" ^
    Override
    * o& Q' D3 e# m; jpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request), F9 _+ r9 ~+ V2 p' ~
    throws YarnException, IOException {8 `2 H/ X/ O) a
    //具体实现
    ; R$ o% v7 Q; C' K; h4 z: F
    }. b" ^5 k% c2 V. r% T, o: v
    }- O) m) c2 V. p+ ]4 P
    NodeManager(客户端) 中的相关代码如下。
    # p$ d$ q5 @& m- B& a/ j# w/ C
    // 该函数是从YARN源代码中简单修改而来的
    ' a# n  j5 ~7 D' M( k
    protected ResourceTracker getRMClient() throws IOException {
    4 @3 U& z* s% sConfiguration conf = getConfig();
    5 [) U0 p& m( i5 ~; Y  d/ lInetSocketAddress rmAddress = getRMAddress(conf, protocol);3 n1 [: e3 {# [: V' y( U3 F
    RetryPolicy retryPolicy = createRetryPolicy(conf);$ N9 I; m4 ]' B/ ~7 C
    ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);9 q6 Y' W9 @) i! Y4 |7 o8 B1 P
    LOG.info("Connecting to ResourceManager at " + rmAddress);& D$ _6 F5 f7 Y
    return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
    & _. I- b. c; K) ?) y8 Q. v8 c; V}.# ?% J, C* H5 C1 R
    ..
    ( |) P. v# q  h; M9 i' o# kthis.resourceTracker = getRMClient();; I. {, `' [: ]0 D$ b& k$ B
    ...
    # @% S/ Q  [* Q) j# o9 l- A/ O" f. X( IRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
    0 X- ?7 Y1 B: v$ r3 V8 ^...% Y  L5 x1 X, Z3 G, O
    response = resourceTracker.nodeHeartbeat(request);
    ) s: |6 d2 J0 k$ `为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
    # o7 d% G/ w! f" J; s步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat
    ' L7 n4 l6 c$ ~
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:6 v9 ^* A/ j9 i) p) m  A. p
    public interface ResourceTracker {
    8 o1 l6 \( Z( Epublic RegisterNodeManagerResponse registerNodeManager(& K1 Q$ z* U# L# p3 O2 [- q
    RegisterNodeManagerRequest request) throws YarnException, IOException;
    ! [: z6 Z% e, c( L( o6 \, U1 g6 Fpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    . f6 y8 l: o) C# k  v+ hthrows YarnException, IOException;
    8 L7 O! t' X/ E1 F- }* Q- ?  ]}
    5 w. {+ m" V, @0 Y  I% P- n9 N0 J6 G步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
    6 p7 |/ J" Q: u- r1 e- G未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
    " T8 ?. d3 V, Z% \$ i% c5 g/ ~) a# O
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:2 Q3 M# \6 V: H, o* \, d) E
    option java_package = "org.apache.hadoop.yarn.proto";
    ( P* \9 }* p- o3 K  y% coption java_outer_classname = "ResourceTracker";, i6 d5 j/ E% B3 F  o
    option java_generic_services = true;$ l: w- Q: `& y% N3 h4 B
    option java_generate_equals_and_hash = true;4 s) c/ |0 c% W; W, s* I
    import "yarn_server_common_service_protos.proto";
    $ C$ I& V1 g) G3 a5 r: b7 wservice ResourceTrackerService {
    : g# F7 r* |0 i' Xrpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);5 S# f0 w( V5 D; Z- y
    rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    & Q4 M" r- l) n+ U}
    ) }. B( [" y& H( P) lResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。8 \7 ^+ m0 v5 q. R! j
    步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol% |  |1 s. X" G" D( N+ E0 `" d
    Buffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest9 E' a3 \6 ~" m/ F9 K
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto
    ( J  n+ p/ @  l" p# D- J0 p5 W件) :1 I' M* D& [" `" c7 B6 a
    import "yarn_protos.proto";
    # }3 R2 ?* O' x* m1 oimport "yarn_server_common_protos.proto";# L! p) d3 y) }! T$ i
    message RegisterNodeManagerRequestProto {
    ( @! S; _  L" K% Woptional NodeIdProto node_id = 1;* x6 H+ P( f5 W0 A4 ~: [
    optional int32 http_port = 3;( h; P& c8 X1 _3 ~
    optional ResourceProto resource = 4;  H8 D3 l; m- x2 V/ r" G
    } m9 ?; [: @) s7 Q7 }8 m
    essage RegisterNodeManagerResponseProto {1 ~) T7 T: j* s/ ?1 v
    optional MasterKeyProto container_token_master_key = 1;
      G  b- b* v5 j2 a6 aoptional MasterKeyProto nm_token_master_key = 2;
    2 n9 u* \- D2 Z3 |7 G2 o# Moptional NodeActionProto nodeAction = 3;; @% c. T4 e- Q; Z( x: k
    optional int64 rm_identifier = 4;
    ! ^' M! H5 ^' B: A8 x- }optional string diagnostics_message = 5;7 c5 {4 w1 }1 ]- l- i0 U* y: l, l9 S
    }.% s3 T7 v# z& K& t/ B, ]: ]
    .. //其他几个参数和返回值的定义
    / u, J/ H* M5 I
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
    1 U# ~$ \1 m% |; l* W+ d% y原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol( P. K& f3 H. u' P
    Buffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数/ a# x! x9 ]0 s7 f
    RegisterNodeManagerRequest为例进行说明。3 M5 e% d6 t$ p9 R& h2 ]5 f
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :, j2 k9 E* x# ~# q, i
    public interface RegisterNodeManagerRequest {
    " B+ ?" {. Q6 c0 j, w7 B1 iNodeId getNodeId();: A2 w: j: L: C5 C% H$ |+ F% ]) f
    int getHttpPort();7 N9 t9 B9 }& U4 W# ^# c
    Resource getResource();
    0 M0 k% f- }* {7 [  S0 E$ rvoid setNodeId(NodeId nodeId);2 j) M4 x5 `0 _
    void setHttpPort(int port);
    4 z7 {' K$ y) K6 ]1 Nvoid setResource(Resource resource);
    ! p) x/ D: E9 o5 m9 I}
    6 B3 c5 {- g# H- i3 F, x9 n9 ~8 u. EJava封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :: i* V' M" J* ~5 q& j5 v) J
    public class RegisterNodeManagerRequestPBImpl extends
    ( X% B3 ?; Q& y# p! ^5 DProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {9 Y6 w$ G8 U: D, G5 n+ t& i. P2 v
    RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
      `7 C7 k( A- J! V$ CRegisterNodeManagerRequestProto.Builder builder = null;
    8 D$ m" `9 A+ z6 p/ }5 o! J- }private NodeId nodeId = null;
    " ?4 u$ y$ D4 T  d. l8 N...
    2 Y0 Q# ?' \( y3 }& f: e@Override$ i; w9 r9 T, W$ \3 }
    public NodeId getNodeId() {- F+ X/ E0 L. B
    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;) a) l1 m$ W" Z2 h6 w* @# q
    if (this.nodeId != null) {# y2 k$ P9 T! w9 B: f/ @- u( h! D
    return this.nodeId;' Q7 Z: i) P% G0 A
    }i8 v1 E) E" Z" C4 ^' H4 }; u: j$ b
    f (!p.hasNodeId()) {
    ' J8 ]5 W7 Z. {& i8 }: `- w- p# dreturn null;4 J& u& w: L; v% u: ?7 @3 _" C1 Z0 L7 y4 o
    } t8 e, d) e* o, ]% t9 {2 c- K
    his.nodeId = convertFromProtoFormat(p.getNodeId());! q* g( N$ c" f
    return this.nodeId;
    / J$ ~( B8 o4 z4 `} @6 \8 c9 {3 t7 ~% v9 }2 X5 f0 x
    Override
    # S, }5 ?+ V1 e* Y( X0 k1 Apublic void setNodeId(NodeId nodeId) {8 n7 M) w: x( C( k4 c% ]1 f. f8 h
    maybeInitBuilder();, J0 j# N3 M& h& k* O6 ?) I
    if (nodeId == null)9 C! F3 A! ?0 r5 R
    builder.clearNodeId();
    7 M1 b. Z+ ]! l3 h) i, e% L9 b7 Cthis.nodeId = nodeId;
    , P8 `# Q0 X  Q$ J- j% [} .
    ) b) S' J/ x, H: N..8 w' w3 S3 {2 v" ^; @* r- z
    }7 o. V* h2 B& ]9 K6 V& P: q
    步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为
    3 Z: _: ]% b  A! K. b
    ResourceTrackerPBClientImpl, 实现如下:) e/ o5 M& M, B* F6 a
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {. V. V; y3 h" j* z9 C& V+ G1 Z
    private ResourceTrackerPB proxy;7 k6 c8 n" I5 N3 j9 j* `7 o
    public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
    9 T9 V3 t: p9 ]3 ~6 [/ p# `6 fRPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);/ @# n6 m, c! W  ]3 b
    proxy = (ResourceTrackerPB)RPC.getProxy(
    ' G( b) V  D$ B# V7 hResourceTrackerPB.class, clientVersion, addr, conf);% z. Z3 T% h( L6 e
    } @
    6 H+ M) Q* w" |9 @0 EOverride0 ?" m2 x9 @; f' V9 S9 s
    public RegisterNodeManagerResponse registerNodeManager(
    * Y$ ~8 Y6 h+ j, [, H3 O3 oRegisterNodeManagerRequest request) throws YarnException,
    ) d) f* m3 b! k  L2 b; N$ c& h. PIOException {
    * j4 w5 `) [' hRegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
    ; Y- [' S' |% o; n  B! t- y; xtry {5 x2 n( I9 d0 i/ N9 r8 ?+ g
    return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));4 G9 e+ U- k6 V3 ]: x
    } catch (ServiceException e) {
    ! s" z4 O6 f* }9 @3 Q; KRPCUtil.unwrapAndThrowException(e);
    7 ^3 h+ P6 N1 m) N: `return null;% K" b- N) V8 _% M
    }
    . o# r' Y! k5 H$ ]} .
    0 I/ B& Z0 [5 p3 ]. A..  `$ o7 U/ F$ S0 R6 b  }6 k% T
    }
    ; _( _) _  \' ^5 Q6 S3 s" S服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
    , v, A" N3 T& c0 t# _# F
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {+ }+ S+ O3 E2 W8 y) a
    private ResourceTracker real;
    % O0 x7 M$ `) y% G* ?* P4 Spublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {3 I) ?8 w: p: ^% q6 A
    this.real = impl;
    / t' d5 H4 S. j2 t, W) b* t# V# @} @
    7 P8 s- u6 s8 B; Y- @9 P6 E+ N2 ZOverride; M1 E6 ]! R) f0 }; s! U! M+ s
    public RegisterNodeManagerResponseProto registerNodeManager(
    $ [; t/ B$ x3 ]8 SRpcController controller, RegisterNodeManagerRequestProto proto)+ s2 F. ?8 i4 j! }# I
    throws ServiceException {
    6 ]4 y- d' ?( d2 U* yRegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);) H3 p) R! ?5 I6 J$ G' {1 n: c. ~
    try {
    : W: V$ Q5 n9 }1 u1 l+ JRegisterNodeManagerResponse response = real.registerNodeManager(request);
    % v$ r$ ]8 p2 w+ I2 [2 n5 m3 ]+ m% zreturn ((RegisterNodeManagerResponsePBImpl)response).getProto();0 i5 x% w5 r0 p# [9 y
    } catch (YarnException e) {
    % M. f+ L9 ?  g  v+ p4 e3 sthrow new ServiceException(e);, Z& R% m9 ~4 f3 T' u4 K
    } catch (IOException e) {! _8 j( s; L0 E' T& L
    throw new ServiceException(e);" ?( y, \0 G) `* M, M( O/ D
    }1 b- V6 V( z6 x) u4 W
    } .; h* |0 t. v, S. x! e3 x& S+ \
    ..* N* l4 ?* e/ @5 x
    }* C. N( }9 S4 @7 |
    总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列# x. l  K, `+ x  k$ a
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
    " x8 f$ |8 @! E* X4 C6 T# X8 E; b, ?3-12 YARN RPC中的Protocol Buffers封装/ p3 m; s, }2 b; t5 t3 A) K6 y% F
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call+ G2 a1 V- Z8 b* Z) I
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。1 g8 u5 R5 b: H7 E
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。! Y: f1 W" B. A: B3 K; ^2 j
    [9] 参见网址http://thrift.apache.org/
    4 ?( {  \, i, d% [1 g; \
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns! D/ [4 o& g+ o7 b
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。$ M" Q# E3 E: l. L0 w  G0 p$ G
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。
    / n, M8 F# r# v+ W9 L- b
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347: g: w) D" i8 k5 Y: K3 P
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则- g6 k; j1 K- H
    使其向前迈进了一步。
      
    * g' K. N! P3 C8 }6 ^
    3 I8 a, b3 l2 I( E. i3 C
    & T& z% b% ^/ y$ K7 c+ [, R
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-4-25 01:27 , Processed in 0.106123 second(s), 29 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表