javazx 发表于 2017-4-17 13:56:45

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

3.3.5 Hadoop RPC类详解
Hadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。
1.ipc.RPC类分析
RPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一
个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。
与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers
等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
调用RPC.setProtocolEngine(…)修改采用的序列化方式。
下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
图3-4 HadoopRPC的主要类关系图
图3-5 HadoopRPC中服务器端动态代理实现类图
2.ipc.Client
Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
public Writable call(Writable param, ConnectionIdremoteId)
throws InterruptedException, IOException;
图3-6 Client类图
Client内部有两个重要的内部类, 分别是Call和Connection。
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和
param两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。
❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流
( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
❍addCall—将一个Call对象添加到哈希表中;
❍sendParam—向服务器端发送RPC请求;
❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。
图3-7 Hadoop RPC Client处理流程
3.ipc.Server类分析
Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker , 这是制约系统性能和可扩展
性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。
Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性
能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。
图3-8 Reactor模式工作原理
典型的Reactor模式中主要包括以下几个角色。
❑Reactor: I/O事件的派发者。
❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。
❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断
的处理。
❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线
程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
应的Reader和Sender线程处理。
ip
c.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为
此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
图3-9 Hadoop RPC Server处理流程
( 1) 接收请求
该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)
中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。
整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。
Listener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。
对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对
象, 放到共享队列callQueue中。
( 2) 处理请求
该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
程完成。
Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果
返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时
Handler将尝试着将后续发送任务交给Responder线程。
( 3) 返回结果
前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结
果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。
Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送
未发送完成的结果。
3.3.6 Hadoop RPC参数调优
Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
Reader线程。
❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
100×10=1000。
❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的
Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为
50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。
❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
10次( 每两次之间相隔1秒) 。
3.3.7 YARN RPC实现
当前存在非常多的开源RPC框架, 比较有名 的有Thrift 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组
成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多
) 。 相比于Hadoop RPC, 它们有以下几个特点:
❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用
Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。
❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,
并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应
用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。
❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
果用户企图这样做, 会抛出VersionMismatch异常。
为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
RPC, 而 AvroRpcEngine 和 ProtobufRpcEngine 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的
RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生
成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。
图3-10 Hadoop RPC 集成多种开源RPC 框架
图3-11 YarnRPC 相关类图
❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于
Java包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
缀"PBClientImpl") 。
❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包
名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。
Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
下几个方面:
❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允
许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能
方面有很大提升。
❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色,
其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
Buffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。
3.3.8 YARN RPC应用实例
为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向
ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
public class ResourceTrackerService extends AbstractService implements
ResourceTracker {
private Server server;
...
protected void serviceStart() throws Exception {
super.serviceStart();
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类
this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
this.server.start();
}.
..
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
//具体实现
}@
Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
//具体实现
}
}
NodeManager(客户端) 中的相关代码如下。
// 该函数是从YARN源代码中简单修改而来的
protected ResourceTracker getRMClient() throws IOException {
Configuration conf = getConfig();
InetSocketAddress rmAddress = getRMAddress(conf, protocol);
RetryPolicy retryPolicy = createRetryPolicy(conf);
ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
LOG.info("Connecting to ResourceManager at " + rmAddress);
return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
}.
..
this.resourceTracker = getRMClient();
...
RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
...
response = resourceTracker.nodeHeartbeat(request);
为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat
两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
public interface ResourceTracker {
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, IOException;
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException;
}
步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "ResourceTracker";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "yarn_server_common_service_protos.proto";
service ResourceTrackerService {
rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
}
ResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
Buffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和
NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文
件) :
import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
message RegisterNodeManagerRequestProto {
optional NodeIdProto node_id = 1;
optional int32 http_port = 3;
optional ResourceProto resource = 4;
} m
essage RegisterNodeManagerResponseProto {
optional MasterKeyProto container_token_master_key = 1;
optional MasterKeyProto nm_token_master_key = 2;
optional NodeActionProto nodeAction = 3;
optional int64 rm_identifier = 4;
optional string diagnostics_message = 5;
}.
.. //其他几个参数和返回值的定义
步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol
Buffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
RegisterNodeManagerRequest为例进行说明。
Java接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :
public interface RegisterNodeManagerRequest {
NodeId getNodeId();
int getHttpPort();
Resource getResource();
void setNodeId(NodeId nodeId);
void setHttpPort(int port);
void setResource(Resource resource);
}
Java封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :
public class RegisterNodeManagerRequestPBImpl extends
ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null;
private NodeId nodeId = null;
...
@Override
public NodeId getNodeId() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeId != null) {
return this.nodeId;
}i
f (!p.hasNodeId()) {
return null;
} t
his.nodeId = convertFromProtoFormat(p.getNodeId());
return this.nodeId;
} @
Override
public void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null)
builder.clearNodeId();
this.nodeId = nodeId;
} .
..
}
步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为
ResourceTrackerPBClientImpl, 实现如下:
public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
private ResourceTrackerPB proxy;
public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
proxy = (ResourceTrackerPB)RPC.getProxy(
ResourceTrackerPB.class, clientVersion, addr, conf);
} @
Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
try {
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
} .
..
}
服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
private ResourceTracker real;
public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
this.real = impl;
} @
Override
public RegisterNodeManagerResponseProto registerNodeManager(
RpcController controller, RegisterNodeManagerRequestProto proto)
throws ServiceException {
RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
try {
RegisterNodeManagerResponse response = real.registerNodeManager(request);
return ((RegisterNodeManagerResponsePBImpl)response).getProto();
} catch (YarnException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
}
} .
..
}
总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列
Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
图3-12 YARN RPC中的Protocol Buffers封装
参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。
Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。
HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。
参见网址http://thrift.apache.org/。
参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。
AvroRpcEngine从Hadoop 0.21.0版本开始出现。
ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。
参见网址https://issues.apache.org/jira/browse/HADOOP-7347。
Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
使其向前迈进了一步。


页: [1]
查看完整版本: 《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2