|
3.3.5 Hadoop RPC类详解9 O; G" {6 y6 W6 V5 H2 H \* }1 r
Hadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。. [ k8 x5 x/ u, u$ |+ m
1.ipc.RPC类分析
2 e0 E7 S) m3 }8 V+ GRPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
. h# |5 ]) Q8 ?; ]! {. c8 H8 N, o如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一
- _8 l1 i) F- u; X* r2 R& a个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户 S& u+ A1 Y z3 l B/ d
设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
* _3 F" l3 K# H y$ M* vRPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。 i/ k6 U9 \5 x. l- c2 c! o( G* y1 f
与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers) |( Z% O5 c/ ]( H, k
等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
$ s1 n) ~8 {( j6 ]/ L调用RPC.setProtocolEngine(…)修改采用的序列化方式。' g# H) h/ D& \5 N
下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用$ h- N5 O( Z5 ], c& T
了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
. l z* M6 ]3 ~4 y2 d完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
; Q: d1 k+ S; A7 J# a程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
/ o9 F9 S3 ?' h1 u9 ^4 [打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,& ]+ Q! @- ~! f' q$ c: h E3 b
函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
" g: [, Z. @5 ?7 [& p# K图3-4 HadoopRPC的主要类关系图 t9 S3 |6 _+ o9 u9 B4 G+ H' z# E
图3-5 HadoopRPC中服务器端动态代理实现类图* ?8 x! e7 j5 b& f; @3 t+ ]) ]2 J
2.ipc.Client
4 v: ?# L9 a3 qClient主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执8 `% g! c0 ?$ ]3 |/ T8 D
行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
4 Z8 }; K. ?" |( i" _. m# cpublic Writable call(Writable param, ConnectionIdremoteId)2 d N. Z4 d% z6 }. z0 I8 x% w
throws InterruptedException, IOException;, a2 Q8 ^# d% u" E3 g. p# S
图3-6 Client类图
3 C: }# Y+ Q- a" J8 m' X" J+ N) E9 @Client内部有两个重要的内部类, 分别是Call和Connection。, \ X- ]2 q0 @4 t+ o" [
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出% Q* F5 i- w# S E4 r
错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
& f( G# ]7 j& i7 `% U# T7 ^序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和9 l; x& M6 o$ y) u+ a
param两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。
# s9 N7 [3 e9 K! ] f8 u) H4 l# Y9 }❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本) a- U" S" j+ F8 s0 u% d
信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流
" ~' P9 s$ |, f5 i( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
0 i" K& [( o) D1 j# e, n6 i❍addCall—将一个Call对象添加到哈希表中;
9 k7 Q* J% d# A$ \, r: |2 U❍sendParam—向服务器端发送RPC请求;
0 ~( a0 J) U" c❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
4 M: r5 Q* ~* o+ X8 J❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。5 p- w) }2 S" p8 }
当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
( V7 z$ X5 Z3 Y" c; K2 {1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
+ _& f3 V$ X6 H5 ~" m+ C2 \$ [2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
3 t" f0 v1 c! J: o3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;0 s' b# ^2 m/ t$ ]8 S, \8 F
4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。 O6 }# g* m$ L0 G
图3-7 Hadoop RPC Client处理流程
( j, P' d- L/ l1 ?1 C5 }! y; N3.ipc.Server类分析: z9 {+ a( Q: g9 a, C( y: O
Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker [8] , 这是制约系统性能和可扩展3 @7 T5 l6 ?) ~2 {% P
性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计" l. F5 k/ X; E
目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用& ^* }3 v1 S' s* n( q
了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。# k! D% Y2 G# N h: y8 u! m. y' L
Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性; t4 H. P" k. z0 T/ E8 |
能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。- v' `# B' W9 w; G: ?3 e* P' B
图3-8 Reactor模式工作原理
q! S: s0 R8 @+ ?- Y典型的Reactor模式中主要包括以下几个角色。
' V9 D, P7 O6 i4 c! z/ W( B- o0 Z❑Reactor: I/O事件的派发者。/ ~+ [ c7 L Y- U
❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。
- B/ }, U' J' v, t9 l6 i❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽: W8 `$ {: `3 d/ O2 G
象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
9 [8 e. w2 g. Y9 L' Z) I+ t6 d& @当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断
+ _' A* R. ~) }- h8 o1 F4 ?3 m7 W! L的处理。
4 e7 t) e% L1 t, W8 ~5 T. D❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线
- I/ q7 r0 P3 A2 o8 a, x程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
3 c* c3 h% z2 L) s& _- {- p应的Reader和Sender线程处理。
( u( b4 I+ z$ b3 Q$ w Kip
+ X, q2 J) u5 w! M# d: uc.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
" |2 ?7 B8 y) J; |3 `: L地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。& m; p" \8 \0 \8 V2 i* l9 Q
前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为8 ]3 \. l( x6 h# c, j
此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。/ j1 K9 H; B1 L/ f$ s
图3-9 Hadoop RPC Server处理流程
' `6 B% R! ]4 q n. N2 {( 1) 接收请求
! a0 I+ y |- J/ q, q: r r: ^该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)4 x% b+ }8 N% D! o
中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。
$ |5 c7 M$ K+ D. r: A1 L5 U4 A整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程6 x* T. p9 o; I6 T8 f* c8 h/ B( ~
池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个) G1 v0 }. h; v1 E& ]
Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。
; J- V+ D, j( }# O. z! JListener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。5 J4 m# B( D; D7 t& K& i, z
对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
" n5 M e, W" V% N; ~4 }' PReader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对9 e5 v/ K- c- d% p0 M2 N! k, ^
象, 放到共享队列callQueue中。
! f3 G: D( H* c! T! ?( 2) 处理请求
6 n4 V4 h' ~6 G3 h5 e( |+ T+ l' \: G1 f该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
K! a$ `$ }8 `1 k4 `. A程完成。! @. v% B7 d% o1 ~
Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果; d9 E5 _2 N" B0 u
返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时" }; d2 |+ p) N) d2 b- j
Handler将尝试着将后续发送任务交给Responder线程。0 a! I! `* W- U0 W
( 3) 返回结果
$ e/ ]# L: v% C/ q# D# L前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结) z/ |& h+ I! S Y: O$ O8 V
果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。5 \9 o4 { b- ~4 o, v1 T
Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
- c% i g. k9 T; H结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送$ w) w/ k9 S$ m& a( P& O r
未发送完成的结果。
" `, ^: m" Z5 G9 x m% Z8 S3.3.6 Hadoop RPC参数调优: Y5 \9 i# y, H/ S2 h- G
Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
% f7 |! t& Z( c: \❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
5 m6 ]8 R+ \+ j" b4 cReader线程。
! j+ P( x# i- `; c❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
x1 r3 V8 `; Q3 R' c' t6 {$ vHandler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
5 i. \ _. I( ^, ^100×10=1000。! q2 m2 C) l6 U8 q; p+ c; E, F
❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的9 F: ~9 v/ M+ a" [$ A5 C
Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为
! R' g4 u3 m. |50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。7 R/ U* b7 z' x7 s9 m5 c8 s' A# o3 j
❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
( t# S: g/ Q5 v- n+ P$ G% v能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
5 A6 ?9 a7 F" h1 N# P" r1 C10次( 每两次之间相隔1秒) 。
! R* u4 z: J4 M& W. ~/ k$ @3.3.7 YARN RPC实现! H/ w6 U. E( P$ Q$ r
当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组
( g8 P1 l4 M. Z成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]9 C/ k g! {: V# o/ b
) 。 相比于Hadoop RPC, 它们有以下几个特点:6 h- c0 @* Z# `5 S0 Z* U: B7 x
❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用
% j! i) A; ^ i' o7 QJava语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户$ Z0 s1 I7 F Z' L- o- p3 Q" b
端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。5 J8 Y- f1 Z- p
❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,
7 e8 R0 R; N$ ?0 ]6 W+ S并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应 w5 c0 ]1 a$ J3 `: P
用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。
: [, f8 P1 R( Q* T❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者2 S2 G$ q2 L7 s) N6 O. J
删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。. ~9 m' k z |) S
随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
2 p; Z/ A* g* V5 T9 q❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言! j- u; l1 e4 e& n. [
读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。& ^3 b9 c6 w$ |/ ?- h% d T
❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
/ q- H- o" @4 F5 L9 x. V果用户企图这样做, 会抛出VersionMismatch异常。
* X+ D9 ^: R$ z. A2 F) Z为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
$ I* Q' r7 s8 a1 _后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源0 M8 `: C- Y2 P! `" d
RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的2 n- @( v: z! b7 A
RPC, 而 AvroRpcEngine [11] 和 ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的4 d( D9 m2 b k J6 g; n( i
RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
! Z; B7 ~6 O1 x% K! @* d; ~2 c中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
2 C& M* U5 Y! H; d' V2 `YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信. h* t+ I. E! d4 x
协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
/ c! o" i8 O5 h4 A6 r( K( r0 Iorg.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生( W, `: d, a% R: P) F `
成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
" k- \" Y# _8 E) i' [9 N8 p6 p2 y务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
4 R: B2 f' F! _( a9 m& j' I据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。
* p. I. ?8 N- A( L3 v1 W z' l8 W图3-10 Hadoop RPC 集成多种开源RPC 框架5 F2 B( P y3 P% M2 y
图3-11 YarnRPC 相关类图/ a" l" X" o" M; E d
❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
* H/ J" c/ [0 E: e3 g它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于* K$ y$ ^6 J; W. N
Java包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
, i: p9 n; R% D, o缀"PBClientImpl") 。
A7 {# c9 }: D% W' @* j& t❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
6 s0 c# |% y) }: d. d6 z& H(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包. y/ r: U+ ^1 z Q/ p/ x/ M" c
名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
& K5 I2 F R& I/ V: |类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。
* H% h. Y9 ] vHadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
' q4 X7 t2 e; }/ b* s. w下几个方面:
; o2 w, k1 v0 h3 v8 u; M; k❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允
$ q! i- N) ?- t' ?许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
* L7 I2 [1 f0 K/ Y. M. N户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能
1 a2 f2 A2 R: l8 [/ {, P方面有很大提升。6 l6 v7 `1 f9 Y" ^* ~# ^ N$ ]
❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色,$ r/ [( `8 Y5 M z
其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol2 K" s; `+ G, w% R
Buffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备0 [ v+ ]3 G! F4 t" E8 [
NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。1 I# }& o9 S; w+ K( D$ _
3.3.8 YARN RPC应用实例8 c. ]/ z! B" D3 l
为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。. d# [3 ?/ d( s1 {7 W! _
在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
- n3 T L7 T. s8 L7 Q端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向+ a: a. S! P9 A4 u+ j$ h
ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
; T% I/ j3 v5 r6 A// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server- v1 P3 g0 W) c3 Z) ]
public class ResourceTrackerService extends AbstractService implements
7 @4 Q! }# `# m8 T) V& h) l. }ResourceTracker {
3 i6 x9 I2 M% L" Lprivate Server server;
n% A+ D/ ?7 r0 A9 }...
8 F5 I/ e- x( I& h8 n0 Z, E1 Vprotected void serviceStart() throws Exception {9 T% p5 t% ~) T3 z2 O" ]
super.serviceStart();
! I1 R, Y8 Q3 V5 h& @Configuration conf = getConfig();* ^9 r; \2 A/ V+ ~9 U: L( H
YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类
# j; u/ |( K% ~' {this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
. M3 U" s+ o* ~$ _8 Y1 ?% Z& O1 Yconf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,' j* E4 z9 J0 W7 a/ D' r, Z5 m
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
" F M9 L/ a* O1 k3 K9 x/ w, Nthis.server.start();
5 |" f8 Y( t+ j}.
1 @! m3 E1 w7 T+ N! d, Y..
4 \, t u# v/ E8 I/ }1 {@Override2 K) w" x4 H5 M- ]
public RegisterNodeManagerResponse registerNodeManager(- D3 W. F% v/ ^3 C6 @ c+ B1 R
RegisterNodeManagerRequest request) throws YarnException,3 J. d& i7 ~) u1 x; b+ S* O+ z7 k3 ]
IOException {) I3 P, c( b$ G. ?5 ?# B0 z: f. i/ s
//具体实现
; ^7 M( G$ ?1 J6 W' W}@1 K4 G; [ n& r, o7 F) T8 I" ^
Override
* o& Q' D3 e# m; jpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request), F9 _+ r9 ~+ V2 p' ~
throws YarnException, IOException {8 `2 H/ X/ O) a
//具体实现
; R$ o% v7 Q; C' K; h4 z: F}. b" ^5 k% c2 V. r% T, o: v
}- O) m) c2 V. p+ ]4 P
NodeManager(客户端) 中的相关代码如下。
# p$ d$ q5 @& m- B& a/ j# w/ C// 该函数是从YARN源代码中简单修改而来的
' a# n j5 ~7 D' M( kprotected ResourceTracker getRMClient() throws IOException {
4 @3 U& z* s% sConfiguration conf = getConfig();
5 [) U0 p& m( i5 ~; Y d/ lInetSocketAddress rmAddress = getRMAddress(conf, protocol);3 n1 [: e3 {# [: V' y( U3 F
RetryPolicy retryPolicy = createRetryPolicy(conf);$ N9 I; m4 ]' B/ ~7 C
ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);9 q6 Y' W9 @) i! Y4 |7 o8 B1 P
LOG.info("Connecting to ResourceManager at " + rmAddress);& D$ _6 F5 f7 Y
return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
& _. I- b. c; K) ?) y8 Q. v8 c; V}.# ?% J, C* H5 C1 R
..
( |) P. v# q h; M9 i' o# kthis.resourceTracker = getRMClient();; I. {, `' [: ]0 D$ b& k$ B
...
# @% S/ Q [* Q) j# o9 l- A/ O" f. X( IRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
0 X- ?7 Y1 B: v$ r3 V8 ^...% Y L5 x1 X, Z3 G, O
response = resourceTracker.nodeHeartbeat(request);
) s: |6 d2 J0 k$ `为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
# o7 d% G/ w! f" J; s步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat
' L7 n4 l6 c$ ~两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:6 v9 ^* A/ j9 i) p) m A. p
public interface ResourceTracker {
8 o1 l6 \( Z( Epublic RegisterNodeManagerResponse registerNodeManager(& K1 Q$ z* U# L# p3 O2 [- q
RegisterNodeManagerRequest request) throws YarnException, IOException;
! [: z6 Z% e, c( L( o6 \, U1 g6 Fpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
. f6 y8 l: o) C# k v+ hthrows YarnException, IOException;
8 L7 O! t' X/ E1 F- }* Q- ? ]}
5 w. {+ m" V, @0 Y I% P- n9 N0 J6 G步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
6 p7 |/ J" Q: u- r1 e- G未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
" T8 ?. d3 V, Z% \$ i% c5 g/ ~) a# OProtocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:2 Q3 M# \6 V: H, o* \, d) E
option java_package = "org.apache.hadoop.yarn.proto";
( P* \9 }* p- o3 K y% coption java_outer_classname = "ResourceTracker";, i6 d5 j/ E% B3 F o
option java_generic_services = true;$ l: w- Q: `& y% N3 h4 B
option java_generate_equals_and_hash = true;4 s) c/ |0 c% W; W, s* I
import "yarn_server_common_service_protos.proto";
$ C$ I& V1 g) G3 a5 r: b7 wservice ResourceTrackerService {
: g# F7 r* |0 i' Xrpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);5 S# f0 w( V5 D; Z- y
rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
& Q4 M" r- l) n+ U}
) }. B( [" y& H( P) lResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。8 \7 ^+ m0 v5 q. R! j
步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol% | |1 s. X" G" D( N+ E0 `" d
Buffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和9 E' a3 \6 ~" m/ F9 K
NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文
( J n+ p/ @ l" p# D- J0 p5 W件) :1 I' M* D& [" `" c7 B6 a
import "yarn_protos.proto";
# }3 R2 ?* O' x* m1 oimport "yarn_server_common_protos.proto";# L! p) d3 y) }! T$ i
message RegisterNodeManagerRequestProto {
( @! S; _ L" K% Woptional NodeIdProto node_id = 1;* x6 H+ P( f5 W0 A4 ~: [
optional int32 http_port = 3;( h; P& c8 X1 _3 ~
optional ResourceProto resource = 4; H8 D3 l; m- x2 V/ r" G
} m9 ?; [: @) s7 Q7 }8 m
essage RegisterNodeManagerResponseProto {1 ~) T7 T: j* s/ ?1 v
optional MasterKeyProto container_token_master_key = 1;
G b- b* v5 j2 a6 aoptional MasterKeyProto nm_token_master_key = 2;
2 n9 u* \- D2 Z3 |7 G2 o# Moptional NodeActionProto nodeAction = 3;; @% c. T4 e- Q; Z( x: k
optional int64 rm_identifier = 4;
! ^' M! H5 ^' B: A8 x- }optional string diagnostics_message = 5;7 c5 {4 w1 }1 ]- l- i0 U* y: l, l9 S
}.% s3 T7 v# z& K& t/ B, ]: ]
.. //其他几个参数和返回值的定义
/ u, J/ H* M5 I步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
1 U# ~$ \1 m% |; l* W+ d% y原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol( P. K& f3 H. u' P
Buffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数/ a# x! x9 ]0 s7 f
RegisterNodeManagerRequest为例进行说明。3 M5 e% d6 t$ p9 R& h2 ]5 f
Java接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :, j2 k9 E* x# ~# q, i
public interface RegisterNodeManagerRequest {
" B+ ?" {. Q6 c0 j, w7 B1 iNodeId getNodeId();: A2 w: j: L: C5 C% H$ |+ F% ]) f
int getHttpPort();7 N9 t9 B9 }& U4 W# ^# c
Resource getResource();
0 M0 k% f- }* {7 [ S0 E$ rvoid setNodeId(NodeId nodeId);2 j) M4 x5 `0 _
void setHttpPort(int port);
4 z7 {' K$ y) K6 ]1 Nvoid setResource(Resource resource);
! p) x/ D: E9 o5 m9 I}
6 B3 c5 {- g# H- i3 F, x9 n9 ~8 u. EJava封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :: i* V' M" J* ~5 q& j5 v) J
public class RegisterNodeManagerRequestPBImpl extends
( X% B3 ?; Q& y# p! ^5 DProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {9 Y6 w$ G8 U: D, G5 n+ t& i. P2 v
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
`7 C7 k( A- J! V$ CRegisterNodeManagerRequestProto.Builder builder = null;
8 D$ m" `9 A+ z6 p/ }5 o! J- }private NodeId nodeId = null;
" ?4 u$ y$ D4 T d. l8 N...
2 Y0 Q# ?' \( y3 }& f: e@Override$ i; w9 r9 T, W$ \3 }
public NodeId getNodeId() {- F+ X/ E0 L. B
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;) a) l1 m$ W" Z2 h6 w* @# q
if (this.nodeId != null) {# y2 k$ P9 T! w9 B: f/ @- u( h! D
return this.nodeId;' Q7 Z: i) P% G0 A
}i8 v1 E) E" Z" C4 ^' H4 }; u: j$ b
f (!p.hasNodeId()) {
' J8 ]5 W7 Z. {& i8 }: `- w- p# dreturn null;4 J& u& w: L; v% u: ?7 @3 _" C1 Z0 L7 y4 o
} t8 e, d) e* o, ]% t9 {2 c- K
his.nodeId = convertFromProtoFormat(p.getNodeId());! q* g( N$ c" f
return this.nodeId;
/ J$ ~( B8 o4 z4 `} @6 \8 c9 {3 t7 ~% v9 }2 X5 f0 x
Override
# S, }5 ?+ V1 e* Y( X0 k1 Apublic void setNodeId(NodeId nodeId) {8 n7 M) w: x( C( k4 c% ]1 f. f8 h
maybeInitBuilder();, J0 j# N3 M& h& k* O6 ?) I
if (nodeId == null)9 C! F3 A! ?0 r5 R
builder.clearNodeId();
7 M1 b. Z+ ]! l3 h) i, e% L9 b7 Cthis.nodeId = nodeId;
, P8 `# Q0 X Q$ J- j% [} .
) b) S' J/ x, H: N..8 w' w3 S3 {2 v" ^; @* r- z
}7 o. V* h2 B& ]9 K6 V& P: q
步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为
3 Z: _: ]% b A! K. bResourceTrackerPBClientImpl, 实现如下:) e/ o5 M& M, B* F6 a
public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {. V. V; y3 h" j* z9 C& V+ G1 Z
private ResourceTrackerPB proxy;7 k6 c8 n" I5 N3 j9 j* `7 o
public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
9 T9 V3 t: p9 ]3 ~6 [/ p# `6 fRPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);/ @# n6 m, c! W ]3 b
proxy = (ResourceTrackerPB)RPC.getProxy(
' G( b) V D$ B# V7 hResourceTrackerPB.class, clientVersion, addr, conf);% z. Z3 T% h( L6 e
} @
6 H+ M) Q* w" |9 @0 EOverride0 ?" m2 x9 @; f' V9 S9 s
public RegisterNodeManagerResponse registerNodeManager(
* Y$ ~8 Y6 h+ j, [, H3 O3 oRegisterNodeManagerRequest request) throws YarnException,
) d) f* m3 b! k L2 b; N$ c& h. PIOException {
* j4 w5 `) [' hRegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
; Y- [' S' |% o; n B! t- y; xtry {5 x2 n( I9 d0 i/ N9 r8 ?+ g
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));4 G9 e+ U- k6 V3 ]: x
} catch (ServiceException e) {
! s" z4 O6 f* }9 @3 Q; KRPCUtil.unwrapAndThrowException(e);
7 ^3 h+ P6 N1 m) N: `return null;% K" b- N) V8 _% M
}
. o# r' Y! k5 H$ ]} .
0 I/ B& Z0 [5 p3 ]. A.. `$ o7 U/ F$ S0 R6 b }6 k% T
}
; _( _) _ \' ^5 Q6 S3 s" S服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
, v, A" N3 T& c0 t# _# Fpublic class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {+ }+ S+ O3 E2 W8 y) a
private ResourceTracker real;
% O0 x7 M$ `) y% G* ?* P4 Spublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {3 I) ?8 w: p: ^% q6 A
this.real = impl;
/ t' d5 H4 S. j2 t, W) b* t# V# @} @
7 P8 s- u6 s8 B; Y- @9 P6 E+ N2 ZOverride; M1 E6 ]! R) f0 }; s! U! M+ s
public RegisterNodeManagerResponseProto registerNodeManager(
$ [; t/ B$ x3 ]8 SRpcController controller, RegisterNodeManagerRequestProto proto)+ s2 F. ?8 i4 j! }# I
throws ServiceException {
6 ]4 y- d' ?( d2 U* yRegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);) H3 p) R! ?5 I6 J$ G' {1 n: c. ~
try {
: W: V$ Q5 n9 }1 u1 l+ JRegisterNodeManagerResponse response = real.registerNodeManager(request);
% v$ r$ ]8 p2 w+ I2 [2 n5 m3 ]+ m% zreturn ((RegisterNodeManagerResponsePBImpl)response).getProto();0 i5 x% w5 r0 p# [9 y
} catch (YarnException e) {
% M. f+ L9 ? g v+ p4 e3 sthrow new ServiceException(e);, Z& R% m9 ~4 f3 T' u4 K
} catch (IOException e) {! _8 j( s; L0 E' T& L
throw new ServiceException(e);" ?( y, \0 G) `* M, M( O/ D
}1 b- V6 V( z6 x) u4 W
} .; h* |0 t. v, S. x! e3 x& S+ \
..* N* l4 ?* e/ @5 x
}* C. N( }9 S4 @7 |
总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列# x. l K, `+ x k$ a
Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
" x8 f$ |8 @! E* X4 C6 T# X8 E; b, ?图3-12 YARN RPC中的Protocol Buffers封装/ p3 m; s, }2 b; t5 t3 A) K6 y% F
[6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。+ G2 a1 V- Z8 b* Z) I
[7] Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。1 g8 u5 R5 b: H7 E
[8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。! Y: f1 W" B. A: B3 K; ^2 j
[9] 参见网址http://thrift.apache.org/。
4 ?( { \, i, d% [1 g; \[10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。! D/ [4 o& g+ o7 b
[11] AvroRpcEngine从Hadoop 0.21.0版本开始出现。$ M" Q# E3 E: l. L0 w G0 p$ G
[12] ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。
/ n, M8 F# r# v+ W9 L- b[13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347。: g: w) D" i8 k5 Y: K3 P
[14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则- g6 k; j1 K- H
使其向前迈进了一步。
* g' K. N! P3 C8 }6 ^
3 I8 a, b3 l2 I( E. i3 C
& T& z% b% ^/ y$ K7 c+ [, R |
|