|
3.2 第三方开源库 H8 u# |+ [# G) G
3.2.1 Protocol Buffers8 I& h' P- N1 v# f/ f1 J+ S( |
Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
! j9 q! k4 R/ A& j4 K; RRPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
+ c) q [ O$ P) [) J+ |C++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。
% D% z5 n) h% B9 g0 ~% X% I相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:
1 z l5 z9 P6 T6 d❑平台无关、 语言无关;# c8 {* K: k" d$ J9 c6 s. D: a
❑高性能, 解析速度是XML的20~100倍;
6 @- {3 d3 G% }❑体积小, 文件大小仅是XML的1/10~1/3;5 u( u$ Y/ P4 p- v* B2 m
❑使用简单;
: R8 [6 N/ t# N9 f& O❑兼容性好。
" G. b3 [9 I5 _通常编写一个Protocol Buffers应用需要以下三步:0 e/ d( f4 h: t% c5 F
1) 定义消息格式文件, 通常以proto作为扩展名;
* J3 V7 g) K" c2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;
/ F' a. |' s e4 x/ l( Q- ?3) 使用Protocol Buffers库提供的API来编写应用程序。+ y' W& i2 i; l9 p, ~- X" B
为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。 }7 X( `0 z8 {4 [+ K2 e
该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将" K6 [1 F* A+ f7 W" S. I, n: |
一个人的信息写入文件。% `( V! t- k6 H' o5 m2 [$ `
步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
7 W4 X8 h% Y. v" ~6 ]7 V4 p/ Apackage tutorial; //自定义的命名空间
) Z7 L: l$ d' T7 h. Eoption java_package = "com.example.tutorial"; //生成文件的包名: V& r J+ r, C) g! h6 E3 m
option java_outer_classname = "PersonProtos"; //类名
3 ~5 u" O7 Y9 g/ q; S, G4 rmessage Person { //待描述的结构化数据
3 n9 E+ A' x' n' y/ r7 E0 S2 srequired string name = 1; //required表示这个字段不能为空
$ @$ J3 D) @; y! Frequired int32 id = 2; //数字“2”表示字段的数字别名
5 b2 R7 V( ~+ o% }. q9 G6 Ioptional string email = 3; //optional表示该字段可以为空, i. Y5 o! A2 _! n: S
message PhoneNumber { //内部message& P8 d" ^' C& }8 m2 Q
required string number = 1;
$ r/ g8 N. D% m+ g/ x6 X" u. y. ?6 n: Woptional int32 type = 2;& g- w- O/ T! J
}r4 x6 y8 r" |* s! P( Q- \
epeated PhoneNumber phone = 4;2 k: E0 t0 q- f* b/ h
}4 w t3 q. @5 S. S
步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
. ^, G; M3 G; m& g) g( hprotoc -java_out=. person.proto
% a9 I, q9 A+ f, F注意, 上面的命令运行时的当前路径是person.proto所在目录。; a {0 T [6 T
步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
c& e8 n3 r0 E+ R# d' Q5 ~: z! G; S中, 之后又从文件中读出并打印出来。& X) a* j9 b& h3 ~
public class ProtocolBufferExample {
' h Y2 R& |0 v& }: z( `8 i+ v! Zstatic public void main(String[] argv) {: t6 a. @% v, ]" y
Person person1 = Person.newBuilder()# E9 G* R4 }9 A* W4 A/ m3 d
.setName("Dong Xicheng")
! P; ~# k! @5 O.setEmail("dongxicheng@yahoo.com")0 j- R/ x) R: x# C% a
.setId(11111)
0 E7 z+ v u( `8 N: y% c.addPhone(Person.PhoneNumber.newBuilder()
1 z: n3 \/ G& r.setNumber("15110241024")) x% Q4 w; Y8 J
.setType(0))
1 n4 }' _2 u' o# w8 E6 B.addPhone(Person.PhoneNumber.newBuilder()" S/ ~6 N( E" J: P7 x: \* ~
.setNumber("01025689654")- G: \& _* y7 h" V
.setType(1)).build();
2 G) N+ F, ^1 Etry {# n$ k2 I7 O' B3 \- W
FileOutputStream output = new FileOutputStream("example.txt");
( M3 v- s' f8 e5 d: e8 ~person1.writeTo(output);
- n* y" j* D+ l1 d& W; t& |output.close();6 E8 `7 F+ t5 z& ?+ r' H
} catch(Exception e) {" z# Q: U" u2 r1 T
System.out.println("Write Error! ");3 Q! M) T9 c* P5 S- G6 A( L
} t3 P* o3 U0 U& `- J/ P4 {6 f/ ]
ry {& L! e8 I3 Q. w/ k# r$ I, J, B
FileInputStream input = new FileInputStream("example.txt");7 g4 J4 Z1 `! A4 R& G
Person person2 = Person.parseFrom(input);
6 s* |4 n, C( A. X# w+ fSystem.out.println("person2:" + person2);
' O! Z2 b) | k: j0 i2 G5 D} catch(Exception e) {2 t: K: _* t7 [# H: d7 e6 K
System.out.println("Read Error!");
* P& a8 B0 n7 d& t: j5 \% d: t, I% P}0 J$ [& c+ T0 ~9 _
}
" c9 O P% }" Q1 ^% R3 I" R}
7 M6 Y& Y7 Y" t! J( L4 m7 ~8 G4 P6 H在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引 ]. ~+ p% G, v
入使得YARN在向后兼容性和性能方面向前迈进了一大步。& [0 |( v! O" _" u
除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而- T- j, P8 b. k2 I& L" }
YARN则采用了MRv1中Hadoop RPC库, 举例如下:
7 q4 u! ~4 V& h- k- Q4 qservice ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
& s2 W" s' f3 Drpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);8 B- @% b% T4 `# W# D. y6 h( @. W
rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);' M7 [) F) r" X3 e S7 Q
rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
6 S$ F# |% K# I" I; g* ]& S}& V6 R# e9 m6 Z! g* c
在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
% B$ z- B( V8 X7 v) g❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。
m# J Y* l& ]; C [- s❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。- Z/ \. U; u9 o' s# r6 `3 T1 ?7 }
❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。; P8 f8 y$ f$ r% R
❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—
& M- b/ r; z8 MResourceManagerAdministrationProtocol。
4 e: P! L8 }6 u4 G1 \5 f2 k❑yarn_protos.proto: 定义了各个协议RPC的参数。
3 u0 w P! H3 s- d4 P$ S* e❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。3 H/ S& l+ a7 L( ^& |2 v
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:5 o7 P5 _8 ^$ q. a$ z; g- S
❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。( V. W- F: Z. a9 N, Y" k
❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。, g7 }6 [+ n1 M* y$ {( g
3.2.2 Apache Avro
/ @3 [* V5 J! B2 c, `2 PApache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。8 p; o- l& ?( O- _. S
Avro官网描述Avro的特性和功能如下:8 v- x w1 {& b2 e/ v( ]$ \3 v
❑丰富的数据结构类型;1 h8 @/ X1 m& u; Y% C& }) J6 A
❑快速可压缩的二进制数据形式; R% l6 P5 e( y- ?$ g
❑存储持久数据的文件容器;
5 K, O% u8 Z8 c❑提供远程过程调用RPC;
# I0 B' d" w6 N( A❑简单的动态语言结合功能。
8 t8 x" N/ d/ n相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:0 S: _4 U0 [, I8 Y X
❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
}: z4 z" e2 ]0 Z B❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于% M7 U- v+ Z {, d& A/ P
减少序列化后的数据大小。
6 ]! n1 w% Z2 }/ u; A❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
9 L9 o9 g8 a' ^- z, A! {0 r4 q名, 该方法更加直观、 更加易扩展。
9 w( W" z- O4 h* m$ F7 ^编写一个Avro应用也需如下三步:, N8 j$ ^! H- ?. s
1) 定义消息格式文件, 通常以avro作为扩展名;
6 E2 ^+ `' J+ ~$ V, _" @2) 使用Avro编译器生成特定语言的代码文件( 可选) ;8 I. X3 P7 q( J) A" i2 C
3) 使用Avro库提供的API来编写应用程序。* P5 C) b+ k+ e* p# o
下面给出一个使用实例。% Q( Y* ^1 T; Y
步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
1 a$ y3 ?$ }" [7 E5 w{"namespace": "com.example.tutorial",8 c3 H4 P; ]6 z' Q
"type": "record",
: x2 A) a) ^* v" q7 a"name": "Person",
P& P7 W% Q& Q! g"fields": [
. Y' b; L0 ?' e' N& @{"name": "name", "type": "string"},
9 i. e3 y" l& u; o# ^5 i{"name": "id", "type": "int"},& c% P9 }* J3 e; D
{"name": "email", "type": ["string", "null"]},
- L) {+ v0 { A- K" y, U3 \{"name": "phone", "type": {"type": "array",
, {9 @8 X# m" l"items": {"type": "record", "name": "PhoneNumber",
! h6 D/ f$ {3 H% K2 w"fields": [
. N# ]# k' K! c' z" U{"name": "number", "type": "string"},
7 s% f8 c* y, H0 ]6 f7 V' f! t{"name": "type", "type": ["int", "null"]}
! D G! \: ~/ T: G& D]
# U' |# J! a8 q8 L. y}, [9 ]5 H* n7 B3 A
}
6 M F& d4 }3 m# w$ Q8 S}]
5 G- u* Q: g1 [}6 U) B! ]# b% ^4 C
步骤2 使用Avro编译器生成Java语言, 命令如下:& U- T) g3 W7 E3 z( ~
java -jar avro-tools-1.7.4.jar compile schema person.avro .# V) g+ @$ p5 I& y$ Y
注意, 上面的命令运行时的当前路径是person.avro所在目录。0 k7 v/ i" K6 G; e \
步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
5 |% u/ P% J5 G' {7 Q0 C+ j6 T件中读出并打印。
( a6 c2 r) c, s( O2 T+ ^0 mpublic class AvroExample {, d/ y% ?" ], ^
static public void main(String[] argv) {
! O5 K0 V) L) J0 \: Q" T9 s7 RPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
. j4 A" A2 u5 [$ L. d% [1 o.setNumber("15110241024")6 x5 l) B+ D5 m- K9 `
.setType(0).build();. T; g3 P" Q3 V0 E- _8 h
PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()" F+ Q7 k+ {( j4 t z; V
.setNumber("01025689654")
$ E& n3 ?' I% {.setType(1).build();% e+ ? v' ~6 P- N/ K0 m
List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
7 w U& j8 N# k6 [% rphoneNumbers.add(phoneNumber1);
, X5 w% h6 _8 \9 mphoneNumbers.add(phoneNumber2);
6 C9 j: ~ V5 u( j" B* ~Person person = Person.newBuilder()
8 q- m0 r" J ~; n.setName("Dong Xicheng")
) t, {! w* i9 p- N/ G+ f$ X.setEmail("dongxicheng@yahoo.com")- z0 A9 f& B7 f, e! E: I
.setId(11111)- U8 w6 H1 d1 s$ D! Q
.setPhone(phoneNumbers).build();" j5 g4 j/ g- u) D- J" H3 U
File file = new File("person.txt");
W8 }& u( L: \6 N3 v1 n. itry {
0 s; v5 ]7 b3 R# ~1 h7 [6 W6 fDatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);4 K) F% A) Y9 K+ k G" ^1 w/ V
DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
) ]) i" m6 B( J8 PdataFileWriter.create(person.getSchema(), file);$ g q' g8 e( G6 R" d* X3 U/ S
dataFileWriter.append(person);
! I5 v9 u4 u7 P+ H2 AdataFileWriter.close();& ?: g2 |5 S; y# A- Q; E" s
} catch(Exception e) {
' a Y$ |" T: L5 l0 {) rSystem.out.println("Write Error:" + e);
& g- Z0 `7 G" c. h D, u8 L* E) o1 p}t9 R2 i- A% f _" P
ry {
* v% K5 H u9 m8 y( x& BDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
9 X: {$ T5 k* e( X! u% l7 Z# XDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
9 I$ r- r- n& K/ h R# R5 zperson = null;
! x: q9 C8 I3 c0 {while (dataFileReader.hasNext()) {2 _$ [ x; p3 M0 F' F6 |6 |
person = dataFileReader.next(person);3 v5 {4 K0 B& L7 v( n
System.out.println(person);9 S8 j# h3 v1 `9 o G$ z
}
; H; }- W }) A; A a' g} catch(Exception e) {7 N0 g+ R( s$ s$ }
System.out.println("Read Error:" + e);
( b1 ]: O4 k! k- z; b- t: D& n) [}! P/ q' b. _2 \+ M# l
}5 P+ u7 w5 k& X( c8 x' ? L
}
$ e t9 b7 C# a如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。) a% z, \) F& }/ \* [) v9 U
Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍) Z6 V9 X: e8 G! e! B7 q
使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
8 d3 \! c1 `" f3 s8 b- h9 G- F) p2 y均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:
! O4 u: C: [) U& X{"namespace": "org.apache.hadoop.mapreduce.jobhistory"," k" u& y( t" X4 a
"protocol": "Events",
: S7 _2 z7 s- l! V"types": [8 E5 H2 Y8 d' L. n' t) q I, ^
…{
0 G, f( ]" C1 R2 K"type": "record", "name": "JobInfoChange",+ b) ]& R3 j" E6 U1 r
"fields": [
! [% K/ o4 Z/ T ?% U{"name": "jobid", "type": "string"},' k: Y+ F' n! a8 Y
{"name": "submitTime", "type": "long"},6 D) D, _9 Q$ e$ y+ S' t+ D
{"name": "launchTime", "type": "long"}
3 E4 c8 A: A. D+ J7 z2 j+ t]
! T3 [ h( I7 z& f$ W5 ~},6 r$ l% E- T- p$ w9 [3 w* i1 d$ T
{"type": "record", "name": "JobPriorityChange",
3 e. T/ o. D$ M"fields": [- n5 p" T9 L4 h, I% n4 i0 C
{"name": "jobid", "type": "string"},
# |5 k4 B2 y) a& u9 J) i7 w{"name": "priority", "type": "string"}: x$ P9 x* k9 V6 Y/ `: z2 ?
]
3 b& b5 j! B0 K) j) x! l},1 e& V: `! h5 y0 u2 U6 y& E1 O5 V. I
{"type": "record", "name": "JobStatusChanged",
" L, i7 Q( x6 N( Y"fields": [( u {9 b* I8 b* w5 Q/ y: m
{"name": "jobid", "type": "string"}," d1 B4 W6 W4 K0 n! h. G( G
{"name": "jobStatus", "type": "string"}, I+ _# R4 L' B9 n
]& o7 \. @% B0 D9 _, x+ P$ U) m
},
/ u; b8 C8 d+ a% _; G…]
9 Q, Z6 c& d* K0 Q2 V}
9 E. q7 n: h" l[1] 参见网址http://code.google.com/p/protobuf/。
+ ? ]& M5 u9 }9 Y[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。: Y1 D2 X& ]3 g, _
[3] 参见网址http://avro.apache.org/。3 l* C& S: j9 b* e& s
[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。8 L' [( c" N# n0 y
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。 & Y: F3 U) I& M- \+ Q
7 S2 i5 D( c, {8 _) G8 f5 v2 j6 D2 e
T: k8 d. b: D* b& ]; y& F |
|