|
3.2 第三方开源库* o9 V+ K3 v! p
3.2.1 Protocol Buffers3 I) l- y: b! W: T9 m
Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或' z/ }2 P; Z7 K' f, L
RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
1 O. H* s0 E/ j `% B7 v' mC++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。. P3 P9 R* v% {+ I
相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:
. d' a9 g7 ?4 ^0 S, i❑平台无关、 语言无关;
! E! b7 V j3 e1 L❑高性能, 解析速度是XML的20~100倍;
1 G3 d$ y1 u# i/ g, u- V- S# L5 H: u! Q❑体积小, 文件大小仅是XML的1/10~1/3;4 a% i! |8 s; ~1 H
❑使用简单;
+ V; i* Z, k8 H4 f; }❑兼容性好。
7 S5 m: ^/ Y4 B( Y通常编写一个Protocol Buffers应用需要以下三步:
p" l X5 m. O, S/ {5 ^2 G1) 定义消息格式文件, 通常以proto作为扩展名;
+ ]$ L) |, L1 Y# E" v8 l2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;" s, d8 O/ G0 o# [
3) 使用Protocol Buffers库提供的API来编写应用程序。% \) x& F5 a7 ?" q& c
为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。
, H$ C+ l: Z8 j该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将% J' F8 I+ o3 `. p
一个人的信息写入文件。+ h; I3 S: D. h2 f8 {0 E7 l
步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
( h& [' M$ \2 A0 u- | apackage tutorial; //自定义的命名空间6 v9 J Q4 I( F
option java_package = "com.example.tutorial"; //生成文件的包名
# _5 b- f+ m3 woption java_outer_classname = "PersonProtos"; //类名
1 S) c9 F$ e3 x1 s7 x9 Vmessage Person { //待描述的结构化数据
9 O! H5 l6 N6 d& s) ^: E4 ]& |) Z# {required string name = 1; //required表示这个字段不能为空
1 E+ j' [4 i6 lrequired int32 id = 2; //数字“2”表示字段的数字别名
- A2 B* C9 P7 q; N% L4 @optional string email = 3; //optional表示该字段可以为空
0 ?- A2 b7 X4 e7 T3 `message PhoneNumber { //内部message
3 E3 Q* o* I3 u8 m+ O7 O7 F, |6 irequired string number = 1;$ Z* |/ N; E% J
optional int32 type = 2;
' j6 u+ D1 q6 f8 ~ p}r0 m; |! z: W4 K7 g3 Z0 [
epeated PhoneNumber phone = 4;
1 c, A# t* t/ A. U4 r}# T$ d& [# }7 [9 Y' v
步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
" s2 r. x L4 Q0 oprotoc -java_out=. person.proto2 {4 c5 ~ C9 b" z# Q4 v7 m
注意, 上面的命令运行时的当前路径是person.proto所在目录。
: @4 \5 @* X2 B5 q步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
; v5 s% B. j, u3 e& V, Y+ ^- [中, 之后又从文件中读出并打印出来。
% H* _) V* J8 X# D% S% X" H2 L! Zpublic class ProtocolBufferExample {
! Y9 K: `! T: }3 p7 t! Jstatic public void main(String[] argv) {! G2 C3 F4 n" E- |# j- ?
Person person1 = Person.newBuilder() p5 {0 d- k1 n3 n( X) l
.setName("Dong Xicheng")
# z/ Z0 M) r K% N, s) t/ C$ F' q.setEmail("dongxicheng@yahoo.com")5 S" p7 {+ H8 e1 P' ~! z: k
.setId(11111)
% z- ?' e8 t, U0 t9 ~" y.addPhone(Person.PhoneNumber.newBuilder()
9 ]) z( r7 }: p$ ~) r.setNumber("15110241024")8 W5 ?0 d0 r% m) S% l: g
.setType(0))
2 s; v7 n% Z( K: V5 i0 V$ L.addPhone(Person.PhoneNumber.newBuilder()
. L* v5 ^( h1 A: J& t/ Y* }" z.setNumber("01025689654")
# H$ b1 e5 z' s.setType(1)).build();
/ `5 G- H0 d* B/ y z/ V/ w' x* Wtry {" D! N) |0 t4 b; J) v
FileOutputStream output = new FileOutputStream("example.txt");1 c* i2 [ l( M0 `
person1.writeTo(output);# R( r* b0 C5 J; a- ~/ T8 Z8 c
output.close();4 C4 X$ @' u/ M, G: d6 i4 X6 O; v ^
} catch(Exception e) {* `9 c9 r) p3 q3 S. d
System.out.println("Write Error! ");
0 A; k6 W7 v" D! c9 u} t
8 y4 ^( c" ?; E+ {ry {6 F. f+ R5 K8 m0 N3 v$ K
FileInputStream input = new FileInputStream("example.txt");
4 U2 l6 P1 x1 d- ` J+ RPerson person2 = Person.parseFrom(input);
2 c# O: V& p' k( ~ {: bSystem.out.println("person2:" + person2);
% C3 F5 b m' C0 L% S3 m1 i} catch(Exception e) {5 Q4 Y6 U- |) u. y
System.out.println("Read Error!");3 W9 v- L! F8 h4 `; W$ ]' U& L8 D. ?
}, a# A$ D2 C$ d% z) E
}
2 ?, x3 q0 `$ [}
# m7 g @ v; O) G# z在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引& J" v* Y! @0 r; I
入使得YARN在向后兼容性和性能方面向前迈进了一大步。
& S, X$ E! E- g% Z9 V9 ~除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而+ f$ |$ U4 l3 h$ C% Y
YARN则采用了MRv1中Hadoop RPC库, 举例如下:% e- u/ f, h5 ^4 f
service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义$ P9 X/ H/ h! X/ t& {
rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);
- e" S9 T Q# v7 _1 Orpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
$ a% B+ N, I- g3 `rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
7 F g1 F9 g. E' b}$ U! O" F# }5 M y& O' T
在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
& ]1 Q6 p1 S# |$ ?0 a f" z0 ?9 w❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。
3 z1 T& T4 h/ R; h; o❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。" r! @& d r/ c/ u9 b
❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。" z) l9 k( [, F: X
❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—
9 P5 _. C! U1 g9 ]+ n$ dResourceManagerAdministrationProtocol。
& ^% K7 `% ~$ H' e9 w6 t❑yarn_protos.proto: 定义了各个协议RPC的参数。0 }6 \; J8 P) z8 T, ~8 {0 z0 @
❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。
& o+ Z9 @6 Q9 e- |除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:- S2 d6 A0 w5 t: E8 j. X B0 `
❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。$ ~+ X! B+ O1 l- |" K
❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。, I& ?7 W R n
3.2.2 Apache Avro; c1 D7 @, z$ o a
Apache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。: A& ] {( A! ~9 e/ ~; L) G3 b
Avro官网描述Avro的特性和功能如下:
. x2 D: J n, v) A: M❑丰富的数据结构类型;" Z& @8 g. c' Q L. n$ ?
❑快速可压缩的二进制数据形式;* Y6 P2 R/ ?) ~- y2 W
❑存储持久数据的文件容器;
- n* Q/ g' P) w1 p❑提供远程过程调用RPC;
2 Z5 \* Z9 x/ m; {3 ]❑简单的动态语言结合功能。, ^7 X$ r* y0 ~/ q! H
相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:
c. ?! F0 A! g❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
+ Y0 u1 T* g& r( U7 j1 k" f❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于% M! a! W6 S! i1 C
减少序列化后的数据大小。" X9 M+ v C9 y6 g5 h/ R; `" ~
❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
/ U# n% V- C L: {' @2 s+ m名, 该方法更加直观、 更加易扩展。/ j G- K8 e/ f
编写一个Avro应用也需如下三步:( \1 y% u: ~/ f/ v4 ^
1) 定义消息格式文件, 通常以avro作为扩展名;+ I) t2 d- O/ j
2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
3 v: M$ E5 _, z, G' D3) 使用Avro库提供的API来编写应用程序。
9 E [2 A6 @" v8 D$ `" ]9 q* ?下面给出一个使用实例。, \; Z4 u. G6 g4 U, p& ~. l
步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
, n$ E3 _9 @; g( l/ B! f7 A{"namespace": "com.example.tutorial",
( ]" k. w3 b0 {* R6 A& J"type": "record",3 l; C q8 O3 I2 i( S0 U
"name": "Person",
$ O" n# h* @1 C8 G"fields": [
2 u! V8 U5 K, f$ Q{"name": "name", "type": "string"},% D1 v0 V. i" ^) ^3 H
{"name": "id", "type": "int"},
$ I/ H6 I- P/ c+ r5 C# r9 d& E; ^{"name": "email", "type": ["string", "null"]},
5 l7 m m9 n9 r9 |5 ^{"name": "phone", "type": {"type": "array",* F! x8 @2 | W
"items": {"type": "record", "name": "PhoneNumber",& z. ]7 k4 A% n1 |2 @
"fields": [
8 Y3 z. k3 M$ S+ Y3 W' G{"name": "number", "type": "string"},
5 C6 ]* I* h \8 q4 h3 C; Y# G1 p* m: H{"name": "type", "type": ["int", "null"]}$ e0 {% x' ]2 F, [: z( T0 } g
]/ Z' s8 B8 ]$ W5 r* }
}; x* J9 a) E! e
}5 ?# h* [6 k9 b7 o/ K5 X6 F+ E* k
}]" Y0 e( Y9 ]5 _, n( K+ O7 K
}% j; }2 l/ [6 x5 `7 k7 a
步骤2 使用Avro编译器生成Java语言, 命令如下:3 M% s) p4 R- n7 S9 N3 p+ }
java -jar avro-tools-1.7.4.jar compile schema person.avro .; s* |6 O% ?* D0 r G4 d
注意, 上面的命令运行时的当前路径是person.avro所在目录。, l- J% x9 `) K" {
步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文. S- ^ ?1 O% x0 K* Z5 m6 M3 ]
件中读出并打印。% N( n2 g* J) f( f/ C
public class AvroExample {9 V$ o4 i: b! J+ a8 A* Q) M
static public void main(String[] argv) {
: E) j' U* ?! ZPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
! V5 z { B% X- ?2 i- h: j.setNumber("15110241024")
5 C. r4 [8 H" b7 F3 P7 | h8 ~.setType(0).build();
2 {2 M3 M2 n1 u/ z2 vPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()$ x( b1 B: m: J/ ?- ^% S1 |
.setNumber("01025689654")0 m7 Y5 p3 u8 g( q. B& g
.setType(1).build();- D/ f! E$ t' u
List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
$ j9 {$ _ R% j2 F% p4 G2 ^# y4 NphoneNumbers.add(phoneNumber1);/ ]! U9 [' j& d, S' c$ A/ L: }
phoneNumbers.add(phoneNumber2);9 y1 l4 R6 X6 ]6 d
Person person = Person.newBuilder()
( U2 N8 K6 c+ O1 }7 r: A1 R.setName("Dong Xicheng")
, F8 u( `3 X3 J* J, V.setEmail("dongxicheng@yahoo.com"): d3 Y t8 I5 z' ]% h: X* ?% B1 j
.setId(11111)* z, f3 _% g% A) v3 a* K
.setPhone(phoneNumbers).build();
. T& f/ Z1 c0 IFile file = new File("person.txt");5 `. Q8 _; X7 ]$ h$ j) L- b X, U
try {# c4 Z8 v Q" G! `& F
DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class); b3 R. u. W7 e
DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
0 Q L' N6 L: ?! v2 ^7 ldataFileWriter.create(person.getSchema(), file);
' y* N4 K4 `* x/ {4 mdataFileWriter.append(person);6 R3 {# f. {. g
dataFileWriter.close();
4 \1 e- E5 v3 z3 g( R} catch(Exception e) {9 M B" @/ ~. l; {
System.out.println("Write Error:" + e);
+ w8 v* `" X( I' g! o3 k; H}t4 A# [9 K" T* g, L F
ry {
0 l0 k3 Q- ?! @' z3 T8 i. lDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
1 ^! R% y& H: K4 z6 _ P& uDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);. p: s" X, N- p9 r9 C
person = null;
8 }: F- w6 c8 @# hwhile (dataFileReader.hasNext()) {
7 U: V, ^, B* _! yperson = dataFileReader.next(person);
! v1 q0 R. c( |System.out.println(person);
0 X* Y+ P5 I7 Y# |" B}( W( _6 t* U! m+ R+ L% i
} catch(Exception e) {' K( M9 j$ O, m9 M
System.out.println("Read Error:" + e);& ]- J& ^* Y8 L! h5 M- C7 u. U
}- n" y0 V6 E% V5 m; F0 o
}
6 p$ {& }# L7 _! D! h$ e) U. u- m/ |}
5 E6 E( u6 B9 r6 h: L如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。/ M& f( B7 T8 t1 X
Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍
5 E/ a1 g+ N5 f* W# M; I使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化% F4 \ V/ ]" h S+ J1 z6 v
均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:
8 B+ M" {9 d A: G/ Y! Q' @* X* C. I{"namespace": "org.apache.hadoop.mapreduce.jobhistory",
6 f' S" E5 R0 {8 V"protocol": "Events",. k4 ?. O, V! }
"types": [+ r3 j P2 X# v7 \
…{
% \6 e$ z7 Q2 Q5 w+ w"type": "record", "name": "JobInfoChange",$ W5 l3 Q2 Q5 G; E2 }% `, U
"fields": [
5 U4 [; v. S7 | d% }{"name": "jobid", "type": "string"},
" d/ b1 ?' }: N7 P- Z% B' r2 ^8 K' H{"name": "submitTime", "type": "long"},; n+ Y+ D/ F* U# y) R# \
{"name": "launchTime", "type": "long"}
" `( R0 k/ d7 G2 M* _]
: }0 L, [8 [; c' L# |: V! `3 B" m: t& I},
/ Q6 D8 j5 B* H+ O6 ~9 }* c{"type": "record", "name": "JobPriorityChange",9 ` R4 n7 f2 ^% w& K
"fields": [+ a: [, ^" M; S% l& }& ]
{"name": "jobid", "type": "string"},
5 Q. _5 r7 f/ n7 J- O9 G{"name": "priority", "type": "string"}
7 c4 c! Y" F+ p) n/ G] B& P( z0 D, q" C
},
E* ^" ]% P: r, s; `( ]- n% }{"type": "record", "name": "JobStatusChanged",
y, I9 Z- ?! l9 f! k1 n& M+ T"fields": [8 d, t: c% v7 O
{"name": "jobid", "type": "string"},6 I# T. C- O* r4 @1 c8 @
{"name": "jobStatus", "type": "string"}
* G1 d5 K% C8 }2 Z7 U]
* L; p3 E, }; t8 s8 T},2 a2 m: E. e0 n
…]
$ r8 S2 p" h8 J" k}
" x S% W- p# n* B- n9 J[1] 参见网址http://code.google.com/p/protobuf/。2 ?0 i- C+ Q x9 k# r
[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。
9 A8 H4 o/ Z: I4 [0 p4 z[3] 参见网址http://avro.apache.org/。
! Q6 P. M9 ? @, U[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。8 F8 F3 P) J% a! @, `* h9 [
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。
6 F9 S/ m* C1 Z' a
* v; e# {% F! g3 P4 a8 K# D& V) p W1 h2 t. }" g- ~. M
|
|