|
3.2 第三方开源库
: b- }! ~4 o. b& V% ?- S3.2.1 Protocol Buffers
9 C- [! I7 j* t W( |Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或- i; {) e$ ^1 k w1 d
RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
# ~5 H+ I3 h+ v9 I5 X# F$ zC++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。
& W$ X" Z' o; u' _3 P4 |0 w相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:
& a4 [3 ~! z( H. M2 @- f$ W❑平台无关、 语言无关;
8 }4 P* V9 V( I( u6 J; V; v% ]❑高性能, 解析速度是XML的20~100倍;
$ W& s+ V; Y2 ]❑体积小, 文件大小仅是XML的1/10~1/3;" B5 X* B( R+ L/ w0 \- I
❑使用简单;2 V8 E- e( k, W5 f
❑兼容性好。/ z! b) b2 C( l& c& h5 ?
通常编写一个Protocol Buffers应用需要以下三步:; d. s- a z! E4 e7 @- ]: r$ T
1) 定义消息格式文件, 通常以proto作为扩展名;
* z9 y4 m S7 {2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;1 f+ v8 h; N D- s+ E0 {4 E
3) 使用Protocol Buffers库提供的API来编写应用程序。: Q; P5 k0 F* b
为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。
( `* R( ~3 f% K9 U0 C该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将
3 ^" B+ N# p. v! ^2 R一个人的信息写入文件。2 E3 A8 ^1 g; ]+ m6 v( @9 b. p. {) ?
步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
) E' J" [: o3 w! X/ F4 Xpackage tutorial; //自定义的命名空间
6 [% K4 w( g. U2 p$ w7 |' ?option java_package = "com.example.tutorial"; //生成文件的包名
6 h# u0 M3 q* |( loption java_outer_classname = "PersonProtos"; //类名
$ c, u& ?; B" W5 ^7 n( fmessage Person { //待描述的结构化数据9 w! C$ q/ Z+ P2 c+ l. {
required string name = 1; //required表示这个字段不能为空
+ Z# g$ q$ w1 P, {) q8 b, ?; X+ C8 nrequired int32 id = 2; //数字“2”表示字段的数字别名4 a8 ?, P" o2 Y; U+ a0 |$ J P
optional string email = 3; //optional表示该字段可以为空) t) R- H+ r& p/ _- F% _5 y
message PhoneNumber { //内部message( ?! o3 {# k5 l5 S6 f; l
required string number = 1;
3 H# T) C5 T6 |( J) roptional int32 type = 2;% ?& k( I* E+ B% @
}r+ ]8 p; K; {# n. _
epeated PhoneNumber phone = 4;% f+ i. Q: `) \+ ~+ f+ n
}
! i. e3 Y; x: ~步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
, x }& P6 S, c* n% L1 mprotoc -java_out=. person.proto; A) R* i/ c/ q- V! b) K
注意, 上面的命令运行时的当前路径是person.proto所在目录。. G( C, u8 v m+ I* S c2 Q2 a
步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
1 ~8 F. |2 R" p2 l$ C" s中, 之后又从文件中读出并打印出来。
- j5 d+ h1 p5 M* f" Jpublic class ProtocolBufferExample {
2 b* g+ p0 }- Q8 c+ e# I# P3 H. n; Ustatic public void main(String[] argv) {% g# b: k* o1 L d+ [; j( e
Person person1 = Person.newBuilder()
; m0 e$ ^" I2 o6 M4 |2 u.setName("Dong Xicheng")% k7 [9 r% D& X) G" U
.setEmail("dongxicheng@yahoo.com")- x( t' b8 q5 u; @
.setId(11111). W3 ?0 |; r% d p8 ~7 h% X( l
.addPhone(Person.PhoneNumber.newBuilder()+ `, f) I7 x9 _. V3 E
.setNumber("15110241024")
# [ ?! X7 ^4 m) l& I.setType(0))8 Q+ _8 e3 }* n2 [! y4 j
.addPhone(Person.PhoneNumber.newBuilder()
% O& H. ~( N. N.setNumber("01025689654")" h3 P: f0 H4 g% U6 x
.setType(1)).build();
# g% G+ X( V; K& v/ n8 b/ Ntry {$ e' |- H5 ~6 ^0 c) Z
FileOutputStream output = new FileOutputStream("example.txt");
, f8 C# T1 W2 ?/ H' Vperson1.writeTo(output);/ \6 i O' p) F: p+ l8 T; _
output.close();
$ E$ `/ F% z9 D+ T} catch(Exception e) {- E @. p+ L8 l9 d5 V
System.out.println("Write Error! ");) j3 P% ~/ e0 s8 Z5 T' c
} t# r! i* `4 j7 r9 t
ry {
4 d; X2 T0 Q/ oFileInputStream input = new FileInputStream("example.txt");
3 A, |8 \1 j6 B3 z8 x" i8 J* k+ z6 ]Person person2 = Person.parseFrom(input);
% R$ ]- H6 ^ O4 p0 a; p+ USystem.out.println("person2:" + person2);. v# o" r% q( v, a
} catch(Exception e) {/ \9 z9 o! H( \+ S$ A/ v
System.out.println("Read Error!");, r. X; m% m" v* R$ |4 ]- T
}$ o- r5 G/ O$ a
}& G7 W% s: e* `$ I/ H3 a" Y
}& a& I6 d9 _# L2 p" O
在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引2 Q+ y* f7 Z, w: _8 U- \
入使得YARN在向后兼容性和性能方面向前迈进了一大步。$ G: s4 ?: ]( B& s4 t( O4 o+ d
除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
, |) R& t9 h+ @) u) w, T1 o$ |YARN则采用了MRv1中Hadoop RPC库, 举例如下:
; q: n; _( T& Y( @$ q& \service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
. V9 N. ^, `4 g. O z) k" Orpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);
* D+ S9 G3 _; U* J- Y! b2 _rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);& L8 s- \* ]* ]! o5 t4 [1 w1 ?0 A+ s0 L
rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
+ n1 T+ |* L0 j- k9 \ m+ |}
! g) f4 }6 S2 l' ~在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:2 {& ^0 o" P' F7 ~
❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。
6 P( T/ }: R9 l% G) x7 P1 _❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。
+ Z8 L! K3 t7 h- v; }& d0 s' L! m7 N❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。/ Q* }' X/ p* g, _
❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—, z, F; M9 R# K: o/ r
ResourceManagerAdministrationProtocol。
0 L! \! N6 ~! e X3 m& u/ _ K; }* i❑yarn_protos.proto: 定义了各个协议RPC的参数。
& K0 W+ m% ], w4 G/ p) b+ K/ h❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。; t4 o6 }9 z% M; F. ~, W0 B2 n* |
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:& V; g' p* r5 k5 {* r: h ]
❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。
3 w2 o5 p2 C5 b❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。* ? J, r1 Q/ i# l2 L, T/ v8 J6 w
3.2.2 Apache Avro
# b, z: I* o' \' uApache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。
8 y! S/ t4 T, a) b2 ]Avro官网描述Avro的特性和功能如下:
$ c$ H4 I* B+ d: j1 F❑丰富的数据结构类型;+ ]! |! k) X3 Q2 A
❑快速可压缩的二进制数据形式;
; i, h: p( t2 v' j) M; E2 Q& i❑存储持久数据的文件容器;
, r, K: [/ O- k [! s7 R1 b9 \❑提供远程过程调用RPC;( [' w3 f' F) F8 [ T. E3 ~& R
❑简单的动态语言结合功能。
; ?. b( h6 ? k0 ~2 L3 x- P5 F g相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:
$ d1 U8 }+ Y, C. ^7 l- W9 r+ ?❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。" g9 N2 I9 x1 e. W
❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
- [6 T% r3 J5 B3 @4 k减少序列化后的数据大小。
( L" ?/ ?( y+ b; p& q❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域" y. g( ?. ?2 f6 W* |3 E/ v7 \: ?
名, 该方法更加直观、 更加易扩展。- \8 p7 L- P4 O/ Z9 B
编写一个Avro应用也需如下三步:
/ U4 ` e) ?3 j1) 定义消息格式文件, 通常以avro作为扩展名;
7 ]# y" l9 M+ P. t0 Z2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
% g7 y1 T" z; Z$ H! K% B3) 使用Avro库提供的API来编写应用程序。
C( Q9 ^$ e( b9 w3 K6 x, z, u( B O下面给出一个使用实例。' ?+ w# y! Z; x- \
步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
# D8 A4 W# i& o1 ]{"namespace": "com.example.tutorial",
! k. X0 \; b. x% }$ y; q1 S8 Z"type": "record",0 k! o1 R7 ~. Z0 l) y3 ?
"name": "Person",
# |( w+ |8 X% u9 c0 W% } G# f' R. d0 h"fields": [
) k; ]6 o8 }* M{"name": "name", "type": "string"},6 U7 F) z1 u. R* M" K% J f
{"name": "id", "type": "int"},
; \, \! y' @7 l- \{"name": "email", "type": ["string", "null"]},$ w& P. p0 @2 O" U
{"name": "phone", "type": {"type": "array",! J* R0 N, T) P- \4 p8 ^
"items": {"type": "record", "name": "PhoneNumber",
9 A7 J/ Z' y% ?, S/ n"fields": [. m' w, |3 o, x! X! S. p- ]! _% j4 |
{"name": "number", "type": "string"},
6 Z; @# A1 R) j4 L% a{"name": "type", "type": ["int", "null"]}
6 R* `, o, Q$ S4 D8 c0 e+ h]
0 h" G, g; E8 t; s4 M% v' X}
' B+ o% X& ^1 m}1 E8 x3 n) _0 F/ J
}]/ o( y4 c/ x T; B1 A
}8 q9 O+ F% s$ T& Z
步骤2 使用Avro编译器生成Java语言, 命令如下:+ G! G5 c/ R3 a4 b1 B9 n. P. A
java -jar avro-tools-1.7.4.jar compile schema person.avro .
& ~- C0 q3 p5 {& E/ W注意, 上面的命令运行时的当前路径是person.avro所在目录。1 |0 d. ?; b2 [0 I: t* b9 h. A
步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
5 E* v3 a4 O5 {% k$ m& G5 k( A% p件中读出并打印。
; G# q0 C5 r6 Mpublic class AvroExample {3 I7 W4 [' K" f9 e. v0 j" E0 D. T6 C
static public void main(String[] argv) {
! n, b9 o8 |7 X- o& U9 oPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
$ O9 T/ {/ f- o3 w$ v) k* o' e.setNumber("15110241024") G+ b# b4 |3 a y
.setType(0).build();
5 A2 F4 P4 R: _& Q6 D8 P+ xPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()4 Z( T' U! I3 a2 g+ C
.setNumber("01025689654")
4 M8 \8 W( E) X- X" \' {.setType(1).build();
2 `1 v+ ~" t8 X2 _0 D nList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();4 s) k9 w7 I3 g: ^9 F6 R5 ~9 p; o
phoneNumbers.add(phoneNumber1);6 U& A- P! n1 w9 o; N% V' x% l
phoneNumbers.add(phoneNumber2);* Q h/ ]0 M/ K* u/ i, G5 H
Person person = Person.newBuilder()4 E/ p! @8 o# J: z& v5 @4 G
.setName("Dong Xicheng")- ?- G" _* z6 j- e+ V& b
.setEmail("dongxicheng@yahoo.com"), y8 b: M. C8 d% W" C
.setId(11111)
# ~9 S! T% h/ D; ?: _, F/ ^.setPhone(phoneNumbers).build();0 P, j$ i* d! K5 d' h
File file = new File("person.txt");& w/ j0 B! i( P2 H/ |, O
try {
% F& R1 Y' q. W! y; }1 z9 P; }DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
, K( L$ i: n$ z. c: ~DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
$ D5 g4 D0 L( B% g" q- Z, GdataFileWriter.create(person.getSchema(), file);' g- c" ~3 N5 N
dataFileWriter.append(person);
+ Z. Q1 u [. ]# z+ ldataFileWriter.close();. o' @( b# J; o9 Y5 H- G
} catch(Exception e) {
3 B/ y7 E# S! Y: \# sSystem.out.println("Write Error:" + e);9 t; q; o5 Z, w" K) x+ Z- ?
}t9 E0 Y: W( `# s# X* z6 B P* ]
ry {
4 c1 j; b3 O4 V! L3 k7 |DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
3 q: p6 M' f) e7 n# yDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
$ M# N |5 `" aperson = null;5 _3 O! y# O0 j7 d0 `
while (dataFileReader.hasNext()) {: m/ ^0 d6 e' e! o8 z# [; `
person = dataFileReader.next(person);" }* W, ]4 M0 Y. e$ ]. ^- V" U' a5 j
System.out.println(person);+ V8 v, X& y7 U% X
}! a) C+ }1 z1 X6 S4 c% {
} catch(Exception e) {- ~/ ~7 m- w% f# `# ]
System.out.println("Read Error:" + e);5 b( T1 }' C* K& t
}
$ P4 r4 C+ Y0 D}
9 ?$ w4 ^4 E& o1 ^) x}
0 N& P5 z8 l0 X5 ]如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。0 e" K1 O' c/ w' O" X1 s5 m
Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍3 S1 Z3 U `9 {
使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化( n' g2 r, Z# @' g/ F2 p
均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:" r% x. K1 i" {2 B5 ]7 _
{"namespace": "org.apache.hadoop.mapreduce.jobhistory",( Q0 L% J8 \- Z" P
"protocol": "Events",( @+ D! {; U' }2 G* _- D- ]& J
"types": [6 o+ e) Z: M$ e9 D* Y! H
…{& P8 |' ]0 [/ i+ d
"type": "record", "name": "JobInfoChange",
0 b* u5 a3 ?; j, o! [" l! R"fields": [, n% n1 T% H! M! s! E. E9 [7 K
{"name": "jobid", "type": "string"},' }6 [% f2 F. I1 A$ L6 E- j$ U0 d# {% [
{"name": "submitTime", "type": "long"},
0 k3 e! T2 x* x& @5 N$ a{"name": "launchTime", "type": "long"}
! @0 E* u5 Z/ V) d$ W]1 v* o$ e6 x$ J
},
# K. m- x N7 C{"type": "record", "name": "JobPriorityChange",2 q$ ~ I" [5 Y
"fields": [" ~ ^- S2 r8 a+ X% W
{"name": "jobid", "type": "string"},
) q" W9 G [: y" u{"name": "priority", "type": "string"}
+ S9 ]: {; e2 p/ S$ S+ f, K* e]$ k+ [3 E X" Y) O9 C5 D7 p
},
2 E1 h7 }- O; y6 _* n2 M, |{"type": "record", "name": "JobStatusChanged",
/ _6 o. C1 w1 A$ c/ E0 ^# h"fields": [
! U( m! A8 V) R9 @- k, Q$ J{"name": "jobid", "type": "string"},% o% i5 R) _+ [# Y( d& ^& b( X6 X
{"name": "jobStatus", "type": "string"}) s J$ M; H3 C! T; z8 |5 g/ |1 e
]/ p9 X a2 M" _8 { h) ~
},. H. p( F" j/ [, w" ?: N
…]* \7 m* T3 U9 ?2 p1 A$ }- ?
}
/ P; b! J6 O+ q+ t4 O[1] 参见网址http://code.google.com/p/protobuf/。) M8 e5 ]$ O+ P: O1 o8 |
[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。: I' O& S* e! D& [4 o
[3] 参见网址http://avro.apache.org/。% j e0 d2 Z; x; {7 K4 l! I* h8 u
[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。6 \/ L- N. u* I* z$ |- T1 ^! G
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。 3 R1 L4 X8 n( u) D+ @
/ S' {% |; H; t$ _9 a
5 \1 ]& F$ }% _4 W, s( i6 N |
|