java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3068|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66153

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库
    : b- }! ~4 o. b& V% ?- S3.2.1 Protocol Buffers
    9 C- [! I7 j* t  W( |
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或- i; {) e$ ^1 k  w1 d
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
    # ~5 H+ I3 h+ v9 I5 X# F$ z
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers
    & W$ X" Z' o; u' _3 P4 |0 w相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:
    & a4 [3 ~! z( H. M2 @- f$ W
    ❑平台无关、 语言无关;
    8 }4 P* V9 V( I( u6 J; V; v% ]
    ❑高性能, 解析速度是XML20100倍;
    $ W& s+ V; Y2 ]
    ❑体积小, 文件大小仅是XML1/101/3" B5 X* B( R+ L/ w0 \- I
    ❑使用简单;2 V8 E- e( k, W5 f
    ❑兼容性好。/ z! b) b2 C( l& c& h5 ?
    通常编写一个
    Protocol Buffers应用需要以下三步:; d. s- a  z! E4 e7 @- ]: r$ T
    1) 定义消息格式文件, 通常以proto作为扩展名;
    * z9 y4 m  S7 {
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;1 f+ v8 h; N  D- s+ E0 {4 E
    3) 使用Protocol Buffers库提供的API来编写应用程序。: Q; P5 k0 F* b
    为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。
    ( `* R( ~3 f% K9 U0 C该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将
    3 ^" B+ N# p. v! ^2 R一个人的信息写入文件。2 E3 A8 ^1 g; ]+ m6 v( @9 b. p. {) ?
    步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
    ) E' J" [: o3 w! X/ F4 X
    package tutorial; //自定义的命名空间
    6 [% K4 w( g. U2 p$ w7 |' ?
    option java_package = "com.example.tutorial"; //生成文件的包名
    6 h# u0 M3 q* |( l
    option java_outer_classname = "PersonProtos"; //类名
    $ c, u& ?; B" W5 ^7 n( f
    message Person { //待描述的结构化数据9 w! C$ q/ Z+ P2 c+ l. {
    required string name = 1; //required表示这个字段不能为空
    + Z# g$ q$ w1 P, {) q8 b, ?; X+ C8 n
    required int32 id = 2; //数字“2”表示字段的数字别名4 a8 ?, P" o2 Y; U+ a0 |$ J  P
    optional string email = 3; //optional表示该字段可以为空) t) R- H+ r& p/ _- F% _5 y
    message PhoneNumber { //内部message( ?! o3 {# k5 l5 S6 f; l
    required string number = 1;
    3 H# T) C5 T6 |( J) roptional int32 type = 2;% ?& k( I* E+ B% @
    }r+ ]8 p; K; {# n. _
    epeated PhoneNumber phone = 4;% f+ i. Q: `) \+ ~+ f+ n
    }
    ! i. e3 Y; x: ~步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
    , x  }& P6 S, c* n% L1 m
    protoc -java_out=. person.proto; A) R* i/ c/ q- V! b) K
    注意, 上面的命令运行时的当前路径是person.proto所在目录。. G( C, u8 v  m+ I* S  c2 Q2 a
    步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
    1 ~8 F. |2 R" p2 l$ C" s
    中, 之后又从文件中读出并打印出来。
    - j5 d+ h1 p5 M* f" J
    public class ProtocolBufferExample {
    2 b* g+ p0 }- Q8 c+ e# I# P3 H. n; Ustatic public void main(String[] argv) {% g# b: k* o1 L  d+ [; j( e
    Person person1 = Person.newBuilder()
    ; m0 e$ ^" I2 o6 M4 |2 u.setName("Dong Xicheng")% k7 [9 r% D& X) G" U
    .setEmail("dongxicheng@yahoo.com")- x( t' b8 q5 u; @
    .setId(11111). W3 ?0 |; r% d  p8 ~7 h% X( l
    .addPhone(Person.PhoneNumber.newBuilder()+ `, f) I7 x9 _. V3 E
    .setNumber("15110241024")
    # [  ?! X7 ^4 m) l& I.setType(0))8 Q+ _8 e3 }* n2 [! y4 j
    .addPhone(Person.PhoneNumber.newBuilder()
    % O& H. ~( N. N.setNumber("01025689654")" h3 P: f0 H4 g% U6 x
    .setType(1)).build();
    # g% G+ X( V; K& v/ n8 b/ Ntry {$ e' |- H5 ~6 ^0 c) Z
    FileOutputStream output = new FileOutputStream("example.txt");
    , f8 C# T1 W2 ?/ H' Vperson1.writeTo(output);/ \6 i  O' p) F: p+ l8 T; _
    output.close();
    $ E$ `/ F% z9 D+ T} catch(Exception e) {- E  @. p+ L8 l9 d5 V
    System.out.println("Write Error");) j3 P% ~/ e0 s8 Z5 T' c
    } t# r! i* `4 j7 r9 t
    ry {
    4 d; X2 T0 Q/ oFileInputStream input = new FileInputStream("example.txt");
    3 A, |8 \1 j6 B3 z8 x" i8 J* k+ z6 ]Person person2 = Person.parseFrom(input);
    % R$ ]- H6 ^  O4 p0 a; p+ USystem.out.println("person2:" + person2);. v# o" r% q( v, a
    } catch(Exception e) {/ \9 z9 o! H( \+ S$ A/ v
    System.out.println("Read Error!");, r. X; m% m" v* R$ |4 ]- T
    }$ o- r5 G/ O$ a
    }& G7 W% s: e* `$ I/ H3 a" Y
    }& a& I6 d9 _# L2 p" O
    YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引2 Q+ y* f7 Z, w: _8 U- \
    入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。$ G: s4 ?: ]( B& s4 t( O4 o+ d
    除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
    , |) R& t9 h+ @) u) w, T1 o$ |
    YARN则采用了MRv1Hadoop RPC库, 举例如下:
    ; q: n; _( T& Y( @$ q& \
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
    . V9 N. ^, `4 g. O  z) k" O
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);
    * D+ S9 G3 _; U* J- Y! b2 _rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);& L8 s- \* ]* ]! o5 t4 [1 w1 ?0 A+ s0 L
    rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
    + n1 T+ |* L0 j- k9 \  m+ |}
    ! g) f4 }6 S2 l' ~在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:2 {& ^0 o" P' F7 ~
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol
    6 P( T/ }: R9 l% G) x7 P1 _
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol
    + Z8 L! K3 t7 h- v; }& d0 s' L! m7 N
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol/ Q* }' X/ p* g, _
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议, z, F; M9 R# K: o/ r
    ResourceManagerAdministrationProtocol

    0 L! \! N6 ~! e  X3 m& u/ _  K; }* i
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。
    & K0 W+ m% ], w4 G/ p) b+ K/ h
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker; t4 o6 }9 z% M; F. ~, W0 B2 n* |
    除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:& V; g' p* r5 k5 {* r: h  ]
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol
    3 w2 o5 p2 C5 b
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。* ?  J, r1 Q/ i# l2 L, T/ v8 J6 w
    3.2.2 Apache Avro
    # b, z: I* o' \' u
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。
    8 y! S/ t4 T, a) b2 ]
    Avro官网描述Avro的特性和功能如下:
    $ c$ H4 I* B+ d: j1 F
    ❑丰富的数据结构类型;+ ]! |! k) X3 Q2 A
    ❑快速可压缩的二进制数据形式;
    ; i, h: p( t2 v' j) M; E2 Q& i
    ❑存储持久数据的文件容器;
    , r, K: [/ O- k  [! s7 R1 b9 \
    ❑提供远程过程调用RPC( [' w3 f' F) F8 [  T. E3 ~& R
    ❑简单的动态语言结合功能。
    ; ?. b( h6 ?  k0 ~2 L3 x- P5 F  g相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:
    $ d1 U8 }+ Y, C. ^7 l- W9 r+ ?
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。" g9 N2 I9 x1 e. W
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
    - [6 T% r3 J5 B3 @4 k减少序列化后的数据大小。
    ( L" ?/ ?( y+ b; p& q
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域" y. g( ?. ?2 f6 W* |3 E/ v7 \: ?
    名, 该方法更加直观、 更加易扩展。- \8 p7 L- P4 O/ Z9 B
    编写一个
    Avro应用也需如下三步:
    / U4 `  e) ?3 j
    1) 定义消息格式文件, 通常以avro作为扩展名;
    7 ]# y" l9 M+ P. t0 Z
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
    % g7 y1 T" z; Z$ H! K% B
    3) 使用Avro库提供的API来编写应用程序。
      C( Q9 ^$ e( b9 w3 K6 x, z, u( B  O下面给出一个使用实例。' ?+ w# y! Z; x- \
    步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
    # D8 A4 W# i& o1 ]
    {"namespace": "com.example.tutorial",
    ! k. X0 \; b. x% }$ y; q1 S8 Z"type": "record",0 k! o1 R7 ~. Z0 l) y3 ?
    "name": "Person",
    # |( w+ |8 X% u9 c0 W% }  G# f' R. d0 h"fields": [
    ) k; ]6 o8 }* M{"name": "name", "type": "string"},6 U7 F) z1 u. R* M" K% J  f
    {"name": "id", "type": "int"},
    ; \, \! y' @7 l- \{"name": "email", "type": ["string", "null"]},$ w& P. p0 @2 O" U
    {"name": "phone", "type": {"type": "array",! J* R0 N, T) P- \4 p8 ^
    "items": {"type": "record", "name": "PhoneNumber",
    9 A7 J/ Z' y% ?, S/ n"fields": [. m' w, |3 o, x! X! S. p- ]! _% j4 |
    {"name": "number", "type": "string"},
    6 Z; @# A1 R) j4 L% a{"name": "type", "type": ["int", "null"]}
    6 R* `, o, Q$ S4 D8 c0 e+ h]
    0 h" G, g; E8 t; s4 M% v' X}
    ' B+ o% X& ^1 m}1 E8 x3 n) _0 F/ J
    }]/ o( y4 c/ x  T; B1 A
    }8 q9 O+ F% s$ T& Z
    步骤2 使用Avro编译器生成Java语言, 命令如下:+ G! G5 c/ R3 a4 b1 B9 n. P. A
    java -jar avro-tools-1.7.4.jar compile schema person.avro .
    & ~- C0 q3 p5 {& E/ W注意, 上面的命令运行时的当前路径是person.avro所在目录。1 |0 d. ?; b2 [0 I: t* b9 h. A
    步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
    5 E* v3 a4 O5 {% k$ m& G5 k( A% p件中读出并打印。
    ; G# q0 C5 r6 M
    public class AvroExample {3 I7 W4 [' K" f9 e. v0 j" E0 D. T6 C
    static public void main(String[] argv) {
    ! n, b9 o8 |7 X- o& U9 oPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    $ O9 T/ {/ f- o3 w$ v) k* o' e.setNumber("15110241024")  G+ b# b4 |3 a  y
    .setType(0).build();
    5 A2 F4 P4 R: _& Q6 D8 P+ xPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()4 Z( T' U! I3 a2 g+ C
    .setNumber("01025689654")
    4 M8 \8 W( E) X- X" \' {.setType(1).build();
    2 `1 v+ ~" t8 X2 _0 D  nList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();4 s) k9 w7 I3 g: ^9 F6 R5 ~9 p; o
    phoneNumbers.add(phoneNumber1);6 U& A- P! n1 w9 o; N% V' x% l
    phoneNumbers.add(phoneNumber2);* Q  h/ ]0 M/ K* u/ i, G5 H
    Person person = Person.newBuilder()4 E/ p! @8 o# J: z& v5 @4 G
    .setName("Dong Xicheng")- ?- G" _* z6 j- e+ V& b
    .setEmail("dongxicheng@yahoo.com"), y8 b: M. C8 d% W" C
    .setId(11111)
    # ~9 S! T% h/ D; ?: _, F/ ^.setPhone(phoneNumbers).build();0 P, j$ i* d! K5 d' h
    File file = new File("person.txt");& w/ j0 B! i( P2 H/ |, O
    try {
    % F& R1 Y' q. W! y; }1 z9 P; }DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
    , K( L$ i: n$ z. c: ~DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
    $ D5 g4 D0 L( B% g" q- Z, GdataFileWriter.create(person.getSchema(), file);' g- c" ~3 N5 N
    dataFileWriter.append(person);
    + Z. Q1 u  [. ]# z+ ldataFileWriter.close();. o' @( b# J; o9 Y5 H- G
    } catch(Exception e) {
    3 B/ y7 E# S! Y: \# sSystem.out.println("Write Error:" + e);9 t; q; o5 Z, w" K) x+ Z- ?
    }t9 E0 Y: W( `# s# X* z6 B  P* ]
    ry {
    4 c1 j; b3 O4 V! L3 k7 |DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    3 q: p6 M' f) e7 n# yDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
    $ M# N  |5 `" aperson = null;5 _3 O! y# O0 j7 d0 `
    while (dataFileReader.hasNext()) {: m/ ^0 d6 e' e! o8 z# [; `
    person = dataFileReader.next(person);" }* W, ]4 M0 Y. e$ ]. ^- V" U' a5 j
    System.out.println(person);+ V8 v, X& y7 U% X
    }! a) C+ }1 z1 X6 S4 c% {
    } catch(Exception e) {- ~/ ~7 m- w% f# `# ]
    System.out.println("Read Error:" + e);5 b( T1 }' C* K& t
    }
    $ P4 r4 C+ Y0 D}
    9 ?$ w4 ^4 E& o1 ^) x}
    0 N& P5 z8 l0 X5 ]如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 0 e" K1 O' c/ w' O" X1 s5 m
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC3 S1 Z3 U  `9 {
    使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化( n' g2 r, Z# @' g/ F2 p
    均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:" r% x. K1 i" {2 B5 ]7 _
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",( Q0 L% J8 \- Z" P
    "protocol": "Events",( @+ D! {; U' }2 G* _- D- ]& J
    "types": [6 o+ e) Z: M$ e9 D* Y! H
    …{& P8 |' ]0 [/ i+ d
    "type": "record", "name": "JobInfoChange",
    0 b* u5 a3 ?; j, o! [" l! R"fields": [, n% n1 T% H! M! s! E. E9 [7 K
    {"name": "jobid", "type": "string"},' }6 [% f2 F. I1 A$ L6 E- j$ U0 d# {% [
    {"name": "submitTime", "type": "long"},
    0 k3 e! T2 x* x& @5 N$ a{"name": "launchTime", "type": "long"}
    ! @0 E* u5 Z/ V) d$ W]1 v* o$ e6 x$ J
    },
    # K. m- x  N7 C{"type": "record", "name": "JobPriorityChange",2 q$ ~  I" [5 Y
    "fields": [" ~  ^- S2 r8 a+ X% W
    {"name": "jobid", "type": "string"},
    ) q" W9 G  [: y" u{"name": "priority", "type": "string"}
    + S9 ]: {; e2 p/ S$ S+ f, K* e]$ k+ [3 E  X" Y) O9 C5 D7 p
    },
    2 E1 h7 }- O; y6 _* n2 M, |{"type": "record", "name": "JobStatusChanged",
    / _6 o. C1 w1 A$ c/ E0 ^# h"fields": [
    ! U( m! A8 V) R9 @- k, Q$ J{"name": "jobid", "type": "string"},% o% i5 R) _+ [# Y( d& ^& b( X6 X
    {"name": "jobStatus", "type": "string"}) s  J$ M; H3 C! T; z8 |5 g/ |1 e
    ]/ p9 X  a2 M" _8 {  h) ~
    },. H. p( F" j/ [, w" ?: N
    …]* \7 m* T3 U9 ?2 p1 A$ }- ?
    }
    / P; b! J6 O+ q+ t4 O[1] 参见网址http://code.google.com/p/protobuf/) M8 e5 ]$ O+ P: O1 o8 |
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns: I' O& S* e! D& [4 o
    [3] 参见网址http://avro.apache.org/% j  e0 d2 Z; x; {7 K4 l! I* h8 u
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html6 \/ L- N. u* I* z$ |- T1 ^! G
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  3 R1 L4 X8 n( u) D+ @

    / S' {% |; H; t$ _9 a
    5 \1 ]& F$ }% _4 W, s( i6 N
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-6-17 13:18 , Processed in 0.102148 second(s), 36 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表