java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2992|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66093

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库  H8 u# |+ [# G) G
    3.2.1 Protocol Buffers8 I& h' P- N1 v# f/ f1 J+ S( |
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
    ! j9 q! k4 R/ A& j4 K; R
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
    + c) q  [  O$ P) [) J+ |
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers
    % D% z5 n) h% B9 g0 ~% X% I相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:
    1 z  l5 z9 P6 T6 d
    ❑平台无关、 语言无关;# c8 {* K: k" d$ J9 c6 s. D: a
    ❑高性能, 解析速度是XML20100倍;
    6 @- {3 d3 G% }
    ❑体积小, 文件大小仅是XML1/101/35 u( u$ Y/ P4 p- v* B2 m
    ❑使用简单;
    : R8 [6 N/ t# N9 f& O
    ❑兼容性好。
    " G. b3 [9 I5 _通常编写一个
    Protocol Buffers应用需要以下三步:0 e/ d( f4 h: t% c5 F
    1) 定义消息格式文件, 通常以proto作为扩展名;
    * J3 V7 g) K" c
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;
    / F' a. |' s  e4 x/ l( Q- ?
    3) 使用Protocol Buffers库提供的API来编写应用程序。+ y' W& i2 i; l9 p, ~- X" B
    为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。  }7 X( `0 z8 {4 [+ K2 e
    该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将" K6 [1 F* A+ f7 W" S. I, n: |
    一个人的信息写入文件。% `( V! t- k6 H' o5 m2 [$ `
    步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
    7 W4 X8 h% Y. v" ~6 ]7 V4 p/ A
    package tutorial; //自定义的命名空间
    ) Z7 L: l$ d' T7 h. E
    option java_package = "com.example.tutorial"; //生成文件的包名: V& r  J+ r, C) g! h6 E3 m
    option java_outer_classname = "PersonProtos"; //类名
    3 ~5 u" O7 Y9 g/ q; S, G4 r
    message Person { //待描述的结构化数据
    3 n9 E+ A' x' n' y/ r7 E0 S2 s
    required string name = 1; //required表示这个字段不能为空
    $ @$ J3 D) @; y! F
    required int32 id = 2; //数字“2”表示字段的数字别名
    5 b2 R7 V( ~+ o% }. q9 G6 I
    optional string email = 3; //optional表示该字段可以为空, i. Y5 o! A2 _! n: S
    message PhoneNumber { //内部message& P8 d" ^' C& }8 m2 Q
    required string number = 1;
    $ r/ g8 N. D% m+ g/ x6 X" u. y. ?6 n: Woptional int32 type = 2;& g- w- O/ T! J
    }r4 x6 y8 r" |* s! P( Q- \
    epeated PhoneNumber phone = 4;2 k: E0 t0 q- f* b/ h
    }4 w  t3 q. @5 S. S
    步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
    . ^, G; M3 G; m& g) g( h
    protoc -java_out=. person.proto
    % a9 I, q9 A+ f, F注意, 上面的命令运行时的当前路径是person.proto所在目录。; a  {0 T  [6 T
    步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
      c& e8 n3 r0 E+ R# d' Q5 ~: z! G; S
    中, 之后又从文件中读出并打印出来。& X) a* j9 b& h3 ~
    public class ProtocolBufferExample {
    ' h  Y2 R& |0 v& }: z( `8 i+ v! Zstatic public void main(String[] argv) {: t6 a. @% v, ]" y
    Person person1 = Person.newBuilder()# E9 G* R4 }9 A* W4 A/ m3 d
    .setName("Dong Xicheng")
    ! P; ~# k! @5 O.setEmail("dongxicheng@yahoo.com")0 j- R/ x) R: x# C% a
    .setId(11111)
    0 E7 z+ v  u( `8 N: y% c.addPhone(Person.PhoneNumber.newBuilder()
    1 z: n3 \/ G& r.setNumber("15110241024")) x% Q4 w; Y8 J
    .setType(0))
    1 n4 }' _2 u' o# w8 E6 B.addPhone(Person.PhoneNumber.newBuilder()" S/ ~6 N( E" J: P7 x: \* ~
    .setNumber("01025689654")- G: \& _* y7 h" V
    .setType(1)).build();
    2 G) N+ F, ^1 Etry {# n$ k2 I7 O' B3 \- W
    FileOutputStream output = new FileOutputStream("example.txt");
    ( M3 v- s' f8 e5 d: e8 ~person1.writeTo(output);
    - n* y" j* D+ l1 d& W; t& |output.close();6 E8 `7 F+ t5 z& ?+ r' H
    } catch(Exception e) {" z# Q: U" u2 r1 T
    System.out.println("Write Error");3 Q! M) T9 c* P5 S- G6 A( L
    } t3 P* o3 U0 U& `- J/ P4 {6 f/ ]
    ry {& L! e8 I3 Q. w/ k# r$ I, J, B
    FileInputStream input = new FileInputStream("example.txt");7 g4 J4 Z1 `! A4 R& G
    Person person2 = Person.parseFrom(input);
    6 s* |4 n, C( A. X# w+ fSystem.out.println("person2:" + person2);
    ' O! Z2 b) |  k: j0 i2 G5 D} catch(Exception e) {2 t: K: _* t7 [# H: d7 e6 K
    System.out.println("Read Error!");
    * P& a8 B0 n7 d& t: j5 \% d: t, I% P}0 J$ [& c+ T0 ~9 _
    }
    " c9 O  P% }" Q1 ^% R3 I" R}
    7 M6 Y& Y7 Y" t! J( L4 m7 ~8 G4 P6 HYARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引  ]. ~+ p% G, v
    入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。& [0 |( v! O" _" u
    除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而- T- j, P8 b. k2 I& L" }
    YARN则采用了MRv1Hadoop RPC库, 举例如下:
    7 q4 u! ~4 V& h- k- Q4 q
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
    & s2 W" s' f3 D
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);8 B- @% b% T4 `# W# D. y6 h( @. W
    rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);' M7 [) F) r" X3 e  S7 Q
    rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
    6 S$ F# |% K# I" I; g* ]& S}& V6 R# e9 m6 Z! g* c
    在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
    % B$ z- B( V8 X7 v) g
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol
      m# J  Y* l& ]; C  [- s
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol- Z/ \. U; u9 o' s# r6 `3 T1 ?7 }
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol; P8 f8 y$ f$ r% R
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议
    & M- b/ r; z8 MResourceManagerAdministrationProtocol

    4 e: P! L8 }6 u4 G1 \5 f2 k
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。
    3 u0 w  P! H3 s- d4 P$ S* e
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker3 H/ S& l+ a7 L( ^& |2 v
    除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:5 o7 P5 _8 ^$ q. a$ z; g- S
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol( V. W- F: Z. a9 N, Y" k
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。, g7 }6 [+ n1 M* y$ {( g
    3.2.2 Apache Avro
    / @3 [* V5 J! B2 c, `2 P
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。8 p; o- l& ?( O- _. S
    Avro官网描述Avro的特性和功能如下:8 v- x  w1 {& b2 e/ v( ]$ \3 v
    ❑丰富的数据结构类型;1 h8 @/ X1 m& u; Y% C& }) J6 A
    ❑快速可压缩的二进制数据形式;  R% l6 P5 e( y- ?$ g
    ❑存储持久数据的文件容器;
    5 K, O% u8 Z8 c
    ❑提供远程过程调用RPC
    # I0 B' d" w6 N( A
    ❑简单的动态语言结合功能。
    8 t8 x" N/ d/ n相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:0 S: _4 U0 [, I8 Y  X
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
      }: z4 z" e2 ]0 Z  B
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于% M7 U- v+ Z  {, d& A/ P
    减少序列化后的数据大小。
    6 ]! n1 w% Z2 }/ u; A
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
    9 L9 o9 g8 a' ^- z, A! {0 r4 q名, 该方法更加直观、 更加易扩展。
    9 w( W" z- O4 h* m$ F7 ^编写一个
    Avro应用也需如下三步:, N8 j$ ^! H- ?. s
    1) 定义消息格式文件, 通常以avro作为扩展名;
    6 E2 ^+ `' J+ ~$ V, _" @
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;8 I. X3 P7 q( J) A" i2 C
    3) 使用Avro库提供的API来编写应用程序。* P5 C) b+ k+ e* p# o
    下面给出一个使用实例。% Q( Y* ^1 T; Y
    步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
    1 a$ y3 ?$ }" [7 E5 w
    {"namespace": "com.example.tutorial",8 c3 H4 P; ]6 z' Q
    "type": "record",
    : x2 A) a) ^* v" q7 a"name": "Person",
      P& P7 W% Q& Q! g"fields": [
    . Y' b; L0 ?' e' N& @{"name": "name", "type": "string"},
    9 i. e3 y" l& u; o# ^5 i{"name": "id", "type": "int"},& c% P9 }* J3 e; D
    {"name": "email", "type": ["string", "null"]},
    - L) {+ v0 {  A- K" y, U3 \{"name": "phone", "type": {"type": "array",
    , {9 @8 X# m" l"items": {"type": "record", "name": "PhoneNumber",
    ! h6 D/ f$ {3 H% K2 w"fields": [
    . N# ]# k' K! c' z" U{"name": "number", "type": "string"},
    7 s% f8 c* y, H0 ]6 f7 V' f! t{"name": "type", "type": ["int", "null"]}
    ! D  G! \: ~/ T: G& D]
    # U' |# J! a8 q8 L. y}, [9 ]5 H* n7 B3 A
    }
    6 M  F& d4 }3 m# w$ Q8 S}]
    5 G- u* Q: g1 [}6 U) B! ]# b% ^4 C
    步骤2 使用Avro编译器生成Java语言, 命令如下:& U- T) g3 W7 E3 z( ~
    java -jar avro-tools-1.7.4.jar compile schema person.avro .# V) g+ @$ p5 I& y$ Y
    注意, 上面的命令运行时的当前路径是person.avro所在目录。0 k7 v/ i" K6 G; e  \
    步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
    5 |% u/ P% J5 G' {7 Q0 C+ j6 T件中读出并打印。
    ( a6 c2 r) c, s( O2 T+ ^0 m
    public class AvroExample {, d/ y% ?" ], ^
    static public void main(String[] argv) {
    ! O5 K0 V) L) J0 \: Q" T9 s7 RPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    . j4 A" A2 u5 [$ L. d% [1 o.setNumber("15110241024")6 x5 l) B+ D5 m- K9 `
    .setType(0).build();. T; g3 P" Q3 V0 E- _8 h
    PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()" F+ Q7 k+ {( j4 t  z; V
    .setNumber("01025689654")
    $ E& n3 ?' I% {.setType(1).build();% e+ ?  v' ~6 P- N/ K0 m
    List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
    7 w  U& j8 N# k6 [% rphoneNumbers.add(phoneNumber1);
    , X5 w% h6 _8 \9 mphoneNumbers.add(phoneNumber2);
    6 C9 j: ~  V5 u( j" B* ~Person person = Person.newBuilder()
    8 q- m0 r" J  ~; n.setName("Dong Xicheng")
    ) t, {! w* i9 p- N/ G+ f$ X.setEmail("dongxicheng@yahoo.com")- z0 A9 f& B7 f, e! E: I
    .setId(11111)- U8 w6 H1 d1 s$ D! Q
    .setPhone(phoneNumbers).build();" j5 g4 j/ g- u) D- J" H3 U
    File file = new File("person.txt");
      W8 }& u( L: \6 N3 v1 n. itry {
    0 s; v5 ]7 b3 R# ~1 h7 [6 W6 fDatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);4 K) F% A) Y9 K+ k  G" ^1 w/ V
    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
    ) ]) i" m6 B( J8 PdataFileWriter.create(person.getSchema(), file);$ g  q' g8 e( G6 R" d* X3 U/ S
    dataFileWriter.append(person);
    ! I5 v9 u4 u7 P+ H2 AdataFileWriter.close();& ?: g2 |5 S; y# A- Q; E" s
    } catch(Exception e) {
    ' a  Y$ |" T: L5 l0 {) rSystem.out.println("Write Error:" + e);
    & g- Z0 `7 G" c. h  D, u8 L* E) o1 p}t9 R2 i- A% f  _" P
    ry {
    * v% K5 H  u9 m8 y( x& BDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    9 X: {$ T5 k* e( X! u% l7 Z# XDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
    9 I$ r- r- n& K/ h  R# R5 zperson = null;
    ! x: q9 C8 I3 c0 {while (dataFileReader.hasNext()) {2 _$ [  x; p3 M0 F' F6 |6 |
    person = dataFileReader.next(person);3 v5 {4 K0 B& L7 v( n
    System.out.println(person);9 S8 j# h3 v1 `9 o  G$ z
    }
    ; H; }- W  }) A; A  a' g} catch(Exception e) {7 N0 g+ R( s$ s$ }
    System.out.println("Read Error:" + e);
    ( b1 ]: O4 k! k- z; b- t: D& n) [}! P/ q' b. _2 \+ M# l
    }5 P+ u7 w5 k& X( c8 x' ?  L
    }
    $ e  t9 b7 C# a如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] ) a% z, \) F& }/ \* [) v9 U
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC) Z6 V9 X: e8 G! e! B7 q
    使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
    8 d3 \! c1 `" f3 s8 b- h9 G- F) p2 y均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:
    ! O4 u: C: [) U& X
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory"," k" u& y( t" X4 a
    "protocol": "Events",
    : S7 _2 z7 s- l! V"types": [8 E5 H2 Y8 d' L. n' t) q  I, ^
    …{
    0 G, f( ]" C1 R2 K"type": "record", "name": "JobInfoChange",+ b) ]& R3 j" E6 U1 r
    "fields": [
    ! [% K/ o4 Z/ T  ?% U{"name": "jobid", "type": "string"},' k: Y+ F' n! a8 Y
    {"name": "submitTime", "type": "long"},6 D) D, _9 Q$ e$ y+ S' t+ D
    {"name": "launchTime", "type": "long"}
    3 E4 c8 A: A. D+ J7 z2 j+ t]
    ! T3 [  h( I7 z& f$ W5 ~},6 r$ l% E- T- p$ w9 [3 w* i1 d$ T
    {"type": "record", "name": "JobPriorityChange",
    3 e. T/ o. D$ M"fields": [- n5 p" T9 L4 h, I% n4 i0 C
    {"name": "jobid", "type": "string"},
    # |5 k4 B2 y) a& u9 J) i7 w{"name": "priority", "type": "string"}: x$ P9 x* k9 V6 Y/ `: z2 ?
    ]
    3 b& b5 j! B0 K) j) x! l},1 e& V: `! h5 y0 u2 U6 y& E1 O5 V. I
    {"type": "record", "name": "JobStatusChanged",
    " L, i7 Q( x6 N( Y"fields": [( u  {9 b* I8 b* w5 Q/ y: m
    {"name": "jobid", "type": "string"}," d1 B4 W6 W4 K0 n! h. G( G
    {"name": "jobStatus", "type": "string"}, I+ _# R4 L' B9 n
    ]& o7 \. @% B0 D9 _, x+ P$ U) m
    },
    / u; b8 C8 d+ a% _; G…]
    9 Q, Z6 c& d* K0 Q2 V}
    9 E. q7 n: h" l[1] 参见网址http://code.google.com/p/protobuf/
    + ?  ]& M5 u9 }9 Y
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns: Y1 D2 X& ]3 g, _
    [3] 参见网址http://avro.apache.org/3 l* C& S: j9 b* e& s
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html8 L' [( c" N# n0 y
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  & Y: F3 U) I& M- \+ Q

    7 S2 i5 D( c, {8 _) G8 f5 v2 j6 D2 e
      T: k8 d. b: D* b& ]; y& F
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-4-25 10:27 , Processed in 0.121426 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表