javazx 发表于 2017-3-6 14:40:59

《大规模分布式存储系统》第9章 分布式存储引擎【9.4】

9.4 ChunkServer实现机制
ChunkServer用于存储基线数据,它由如下基本部分组成:
●管理子表,主动实现子表分裂,配合RootServer实现子表迁移、删除、合并;
●SSTable,根据主键有序存储每个子表的基线数据;
●基于LRU实现块缓存(Block cache)以及行缓存(Row cache);
●实现Direct IO,磁盘IO与CPU计算并行化;
●通过定期合并&数据分发获取UpdateServer的冻结数据,从而分散到整个集
群。
每台ChunkServer服务着几千到几万个子表的基线数据,每个子表由若干个
SSTable组成(一般为1个)。下面从SSTable开始介绍ChunkServer的内部实现。
9.4.1 子表管理
每台ChunkServer服务于多个子表,子表的个数一般在10000~100000之间。
Chunk-Server内部通过ObMultiVersionTabletImage来存储每个子表的索引信息,包括数
据行数(row_count),数据量(occupy_size),校验和(check_sum),包含的
SSTable列表,所在磁盘编号(disk_no)等,代码如下:
class ObMultiVersionTabletImage
{
public:
//获取第一个包含指定数据范围的子表
//@paramrange数据范围
//@paramscan_direction正向扫描(默认)还是逆向扫描
//@paramversion子表的版本号
//@paramtablet获取的子表索引结构
int acquire_tablet(const ObNewRange&range,const ScanDirection
scan_direction,const int64_t version,ObTablet*&tablet)const;
//释放一个子表
int release_tablet(ObTablet*tablet);
//新增一个子表,load_sstable表示是否立即加载其中的SSTable文件
int add_tablet(ObTablet*tablet,const bool load_sstable=false);
//每日合并后升级子表到新版本,load_sstable表示是否立即加载新版本的SSTable文件
int upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablet,const bool
load_sstable=false);
//每日合并后升级子表到新版本,且子表发生分裂,有一个变成多个。load_sstable表示是否立即加载
分裂后的SSTable文件
int upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablets[],const int32_t
split_size,const bool load_sstable=false);
//删除一个指定数据范围和版本的子表
int remove_tablet(const ObNewRange&range,const int64_t version);
//删除一个表格对应的所有子表
int delete_table(const uint64_t table_id);
//获取下一批需要进行每日合并的子表
//@paramversion子表的版本号
//@paramsize下一批需要进行每日合并的子表个数
//@paramtablets下一批需要进行每日合并的子表索引结构
int get_tablets_for_merge(const int64_t version,int64_t&size,ObTablet*&
tablets[])const;
};
ChunkServer维护了多个版本的子表数据,每日合并后升级子表的版本号。如果
子表发生分裂,每日合并后将由一个子表变成多个子表。子表相关的操作方法包
括:
1)add_tablet:新增一个子表。如果load_sstable参数为true,那么,立即加载其
中的SSTable文件。否则,使用延迟加载策略,即读取子表时再加载其中的SSTable。
2)remove_tablet:删除一个子表。RootServer发现某个子表的副本数过多,则会
通知其中某台ChunkServer删除指定的子表。
3)delete_table:删除表格。用户执行删除表格命令时,RootServer会通知每台
ChunkServer删除表格包含的所有子表。
4)upgrade_tablet:每日合并后升级子表的版本号。如果没有发生分裂,只需要
将老子表的版本号加1;否则,将老子表替换为多个范围连续的新子表,每个新子表
的版本号均为老子表的版本号加1。
5)acquire_tablet/release_tablet:读取时首先调用acquire_tablet获取一个子表,增
加该子表的引用计数从而防止它在读取过程中被释放掉,接着读取其中的SSTable,
最后调用release_tablet释放子表。
6)get_tablets_for_merge:每日合并时通过调用该函数获取下一批需要进行每日
合并的子表。
9.4.2 SSTable
如图9-8所示,SSTable中的数据按主键排序后存放在连续的数据块(Block)
中,Block之间也有序。接着,存放数据块索引(Block Index),由每个Block最后一
行的主键(End Key)组成,用于数据查询中的Block定位。接着,存放布隆过滤器
(Bloom Filter)和表格的Schema信息。最后,存放固定大小的Trailer以及Trailer的偏
移位置。
图 9-8 SSTable格式
查找SSTable时,首先从子表的索引信息中读取SSTable Trailer的偏移位置,接着
获取Trailer信息。根据Trailer中记录的信息,可以获取块索引的大小和偏移,从而将
整个块索引加载到内存中。根据块索引记录的每个Block的最后一行的主键,可以通
过二分查找定位到查找的Block。最后将Block加载到内存中,通过二分查找Block中
记录的行索引(Row Index)查找到具体某一行。本质上看,SSTable是一个两级索引
结构:块索引以及行索引;而整个ChunkServer是一个三级索引结构:子表索引、块
索引以及行索引。
SSTable分为两种格式:稀疏格式以及稠密格式。对于稀疏格式,某些列可能存
在,也可能不存在,因此,每一行只存储包含实际值的列,每一列存储的内容为:
<列ID,列值>(<Column ID,Column Value>);而稠密格式中每一行都需要存储
所有列,每一列只需要存储列值,不需要存储列ID,这是因为列ID可以从表格
Schema中获取。
例9-4 假设有一张表格包含10列,列ID为1~10,表格中有一行的数据内容
为:
那么,如果采用稀疏格式存储,内容为:<2,20>,<3,30>,<5,50>,
<7,70>,<8,80>;如果采用稠密格式存储,内容为:null,20,30,null,
50,null,70,80,null,null。
ChunkServer中的SSTable为稠密格式,而UpdateServer中的SSTable为稀疏格式,
且存储了多张表格的数据。另外,SSTable支持列组(Column Group),将同一个列
组下的多个列的内容存储在一块。列组是一种行列混合存储模式,将每一行的所有
列分成多个组(称为列组),每个列组内部按行存储。
如图9-9所示,当一个SSTable中包含多个表格/列组时,数据按照[表格ID,列组
ID,行主键]()的形式有序存储。
图 9-9 SSTable包含多个表格/列组
另外,SSTable支持压缩功能,压缩以Block为单位。每个Block写入磁盘之前调
用压缩算法执行压缩,读取时需要解压缩。用户可以自定义SSTable的压缩算法,目
前支持的算法包括LZO以及Snappy。
SSTable的操作接口分为写入和读取两个部分,其中,写入类为
ObSSTableWriter,读取类为ObSSTableGetter(随机读取)和ObSSTableScanner(范围
查询)。代码如下:
class ObSSTableWriter
{
public:
//创建SSTable
//@paramschema表格schema信息
//@parampath SSTable在磁盘中的路径名
//@paramcompressor_name压缩算法名
//@paramstore_type SSTable格式,稀疏格式或者稠密格式
//@paramblock_size块大小,默认64KB
int create_sstable(const ObSSTableSchema&schema,const ObString&path,const
ObString&compressor_name,const int store_type,const int64_t block_size);
//往SSTable中追加一行数据
//@paramrow一行SSTable数据
//@paramspace_usage追加完这一行后SSTable大致占用的磁盘空间
int append_row(const ObSSTableRow&row,int64_t&space_usage);
//关闭SSTable,将往磁盘中写入Block Index,Bloom Filter,Schema,Trailer等信息
//@paramtrailer_offset返回SSTable的Trailer偏移量
int close_sstable(int64_t&trailer_offset);
};
定期合并&数据分发过程将产生新的SSTable,步骤如下:
1)调用create_sstable函数创建一个新的SSTable;
2)不断调用append_row函数往SSTable中追加一行行数据;
3)调用close_sstable完成SSTable写入。
与9.2.1节中的MemTableIterator一样,ObSSTableGetter和ObSSTableScanner实现了
迭代器接口,通过它可以不断地获取SSTable的下一个cell。
class ObIterator
{
public:
//迭代器移动到下一个cell
int next_cell();
//获取当前cell的内容
//@paramcell_info当前cell的内容,包括表名(table_id),行主键(row_key),列编号
(column_id)以及列值(column_value)
int get_cell(ObCellInfo**cell_info);
//获取当前cell的内容
//@paramcell_info当前cell的内容
//@param is_row_changed是否迭代到下一行
int get_cell(ObCellInfo**cell_info,bool*is_row_changed);
};
OceanBase读取的数据可能来源于MemTable,也可能来源于SSTable,或者是合
并多个MemTable和多个SSTable生成的结果。无论底层数据来源如何变化,上层的读
取接口总是ObIterator。
9.4.3 缓存实现
ChunkServer中包含三种缓存:块缓存(Block Cache)、行缓存(Row Cache)以
及块索引缓存(Block Index Cache)。其中,块缓存中存储了SSTable中访问较热的数
据块(Block),行缓存中存储了SSTable中访问较热的数据行(Row),而块索引缓
存中存储了最近访问过的SSTable的块索引(Block Index)。一般来说,块索引不会
太大,ChunkServer中所有SSTable的块索引都是常驻内存的。不同缓存的底层采用相
同的实现方式。
1.底层实现
经典的LRU缓存实现包含两个部分:哈希表和LRU链表,其中,哈希表用于查找
缓存中的元素,LRU链表用于淘汰。每次访问LRU缓存时,需要将被访问的元素移动
到LRU链表的头部,从而避免被很快淘汰,这个过程需要锁住LRU链表。
如图9-10所示,块缓存和行缓存底层都是一个Key-Value Cache,实现步骤如下:
图 9-10 Key-Value Cache的实现
1)OceanBase一次分配1MB的连续内存块(称为memblock),每个memblock包
含若干缓存项(item)。添加item时,只需要简单地将item追加到memblock的尾部;
另外,缓存淘汰以memblock为单位,而不是以item为单位。
2)OceanBase没有维护LRU链表,而是对每个memblock都维护了访问次数和最
近频繁访问时间。访问memblock中的item时将增加memblock的访问次数,如果最近一
段时间之内的访问次数超过一定值,那么,更新最近频繁访问时间;淘汰memblock
时,对所有的memblock按照最近频繁访问时间排序,淘汰最近一段时间访问较少的
memblock。可以看出,读取时只需要更新memblock的访问次数和最近频繁访问时
间,不需要移动LRU链表。这种实现方式通过牺牲LRU算法的精确性,来规避LRU链
表的全局锁冲突。
3)每个memblock维护了引用计数,读取缓存项时所在memblock的引用计数加
1,淘汰memblock时引用计数减1,引用计数为0时memblock可以回收重用。通过引用
计数,实现读取memblock中的缓存项不加锁。
2.惊群效应
以行缓存为例,假设ChunkServer中有一个热点行,ChunkServer中的N个工作线
程(假设为N=50)同时发现这一行的缓存失效,于是,所有工作线程同时读取这行
数据并更新行缓存。可以看出,N-1共49个线程不仅做了无用功,还增加了锁冲突。
这种现象称为“惊群效应”。为了解决这个问题,第一个线程发现行缓存失效时会往
缓存中加入一个fake标记,其他线程发现这个标记后会等待一段时间,直到第一个线
程从SSTable中读到这行数据并加入到行缓存后,再从行缓存中读取。
算法描述如下:
调用internal_get读取一行数据;
if(行不存在){
调用internal_set往缓存中加入一个fake标记;
从SSTable中读取数据行;
将SSTable中读到的行内容加入缓存,清除fake标记,唤醒等待线程;
返回读到的数据行;
}else if(行存在且为fake标记)
{
线程等待,直到清除fake标记;
if(等待成功)返回行缓存中的数据;
if(等待超时)返回读取超时;
}
else
{
返回行缓存中的数据;
}
3.缓存预热
ChunkServer定期合并后需要使用生成的新的SSTable提供服务,如果大量请求同
时读取新的SSTable文件,将使得ChunkServer的服务能力在切换SSTable瞬间大幅下
降。因此,这里需要一个缓存预热的过程。OceanBase最初的版本实现了主动缓存预
热,即:扫描原来的缓存,根据每个缓存项的key读取新的SSTable并将结果加入到新
的缓存中。例如,原来缓存数据项的主键分别为100、200、500,那么只需要从新的
SSTable中读取主键为100、200、500的数据并加入新的缓存。扫描完成后,原来的缓
存可以丢弃。
线上运行一段时间后发现,定期合并基本上都安排在凌晨业务低峰期,合并完
成后OceanBase集群收到的用户请求总是由少到多(早上7点之前请求很少,9点以后
请求逐步增多),能够很自然地实现被动缓存预热。由于ChunkServer在主动缓存预
热期间需要占用两倍的内存,因此,目前的线上版本放弃了这种方式,转而采用被
动缓存预热。
9.4.4 IO实现
OceanBase没有使用操作系统本身的页面缓存(page cache)机制,而是自己实现
缓存。相应地,IO也采用Direct IO实现,并且支持磁盘IO与CPU计算并行化。
ChunkServer采用Linux的Libaio 实现异步IO,并通过双缓冲区机制实现磁盘预读
与CPU处理并行化,实现步骤如下:
1)分配当前(current)以及预读(ahead)两个缓冲区;
2)使用当前缓冲区读取数据,当前缓冲区通过Libaio发起异步读取请求,接着
等待异步读取完成;
3)异步读取完成后,将当前缓冲区返回上层执行CPU计算,同时,原来的预读
缓冲区变为新的当前缓冲区,发送异步读取请求将数据读取到新的当前缓冲区。
CPU计算完成后,原来的当前缓冲区变为空闲,成为新的预读缓冲区,用于下一次
预读。
4)重复步骤3),直到所有数据全部读完。
例9-5 假设需要读取的数据范围为(1,150],分三次读取:(1,50],(50,
100],(100,150],当前和预读缓冲区分别记为A和B。实现步骤如下:
1)发送异步请求将(1,50]读取到缓冲区A,等待读取完成;
2)对缓冲区A执行CPU计算,发送异步请求,将(50,100]读取到缓冲区B;
3)如果CPU计算先于磁盘读取完成,那么,缓冲区A变为空闲,等到(50,
100]读取完成后将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将
(100,150]读取到缓冲区A;
4)如果磁盘读取先于CPU计算完成,那么,首先等待缓冲区A上的CPU计算完
成,接着,将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将(100,
150]读取到缓冲区A;
5)等待(100,150]读取完成后,将缓冲区A返回给上层执行CPU计算。
双缓冲区广泛用于生产者/消费者模型,ChunkServer中使用了双缓冲区异步预读
的技术,生产者为磁盘,消费者为CPU,磁盘中生产的原始数据需要给CPU计算消费
掉。
所谓“双缓冲区”,顾名思义就是两个缓冲区(简称A和B)。这两个缓冲区,总
是一个用于生产者,另一个用于消费者。当两个缓冲区都操作完,再进行一次切
换,先前被生产者写入的被消费者读取,先前消费者读取的转为生产者写入。为了
做到不冲突,给每个缓冲区分配一把互斥锁(简称La和Lb)。生产者或者消费者如
果要操作某个缓冲区,必须先拥有对应的互斥锁。
双缓冲区包括如下几种状态:
●双缓冲区都在使用的状态(并发读写)。大多数情况下,生产者和消费者都处
于并发读写状态。不妨设生产者写入A,消费者读取B。在这种状态下,生产者拥有
锁La;同样地,消费者拥有锁Lb。由于两个缓冲区都是处于独占状态,因此每次读
写缓冲区中的元素都不需要再进行加锁、解锁操作。这是节约开销的主要来源。
●单个缓冲区空闲状态。由于两个并发实体的速度会有差异,必然会出现一个缓
冲区已经操作完,而另一个尚未操作完。不妨假设生产者快于消费者。在这种情况
下,当生产者把A写满的时候,生产者要先释放La(表示它已经不再操作A),然后
尝试获取Lb。由于B还没有被读空,Lb还被消费者持有,所以生产者进入等待
(wait)状态。
●缓冲区的切换。过了若干时间,消费者终于把B读完。这时候,消费者也要先
释放Lb,然后尝试获取La。由于La刚才已经被生产者释放,所以消费者能立即拥有
La并开始读取A的数据。而由于Lb被消费者释放,所以刚才等待的生产者会苏醒过来
(wakeup)并拥有Lb,然后生产者继续往B写入数据。
Oracle公司实现的Linux异步IO库,开源地址:https://oss.oracle.com/projects/libaio-
oracle/
9.4.5 定期合并&数据分发
RootServer将UpdateServer上的版本变化信息通知ChunkServer后,ChunkServer将
执行定期合并或者数据分发。
如果UpdateServer执行了大版本冻结,ChunkServer将执行定期合并。ChunkServer
唤醒若干个定期合并线程(比如10个),每个线程执行如下流程:
1)加锁获取下一个需要定期合并的子表;
2)根据子表的主键范围读取UpdateServer中的修改操作;
3)将每行数据的基线数据和增量数据合并后,产生新的基线数据,并写入到新
的SSTable中;
4)更改子表索引信息,指向新的SSTable。
等到ChunkServer上所有的子表定期合并都执行完成后,ChunkServer会向
RootServer汇报,RootServer会更新RootTable中记录的子表版本信息。定期合并一般
安排在每天凌晨业务低峰期(凌晨1:00开始)执行一次,因此也称为每日合并。另
外,定期合并过程中ChunkServer的压力比较大,需要控制合并速度,否则可能影响
正常的读取服务。
如果UpdateServer执行了小版本冻结,ChunkServer将执行数据分发。与定期合并
不同的是,数据分发只是将UpdateServer冻结的数据缓存到ChunkServer,并不会生成
新的SSTable文件。因此,数据分发对ChunkServer造成的压力不大。
数据分发由外部读取请求驱动,当请求ChunkServer上的某个子表时,除了返回
使用者需要的数据外,还会在后台生成这个子表的数据分发任务,这个任务会获取
UpdateServer中冻结的小版本数据,并缓存在ChunkServer的内存中。如果内存用完,
数据分发任务将不再进行。当然,这里可以做一些改进,比如除了将UpdateServer分
发的数据存放到ChunkServer的内存中,还可以存储到SSD磁盘中。
例9-6 假设某台ChunkServer上有一个子表t1,t1的主键范围为(1,10],只有一
行数据:rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,40
>)。UpdateServer的冻结版本有两行更新操作:rowkey=8=>(<2,update,30
>,<3,up-date,38>)和rowkey=20=>(<4,update,50>)。
●如果是大版本冻结,那么,ChunkServer上的子表t1执行定期合并后结果为:
ro-wkey=8=>(<2,update,30>,<3,update,38>,<4,update,40>);
●如果是小版本冻结,那么,ChunkServer上的子表t1执行数据分发后的结果为:
rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,40>,<2,
update,30>,<3,update,38>)。
9.4.6 定期合并限速
定期合并期间系统的压力较大,需要控制定期合并的速度,避免影响正常服
务。定期合并限速的措施包括如下步骤:
1)ChunkServer:ChunkServer定期合并过程中,每合并完成若干行(默认2000
行)数据,就查看本机的负载(查看Linux系统的Load值)。如果负载过高,一部分
定期合并线程转入休眠状态;如果负载过低,唤醒更多的定期合并线程。另外,
RootServer将UpdateServer冻结的大版本通知所有的ChunkServer,每台ChunkServer会
随机等待一段时间再开始执行定期合并,防止所有的ChunkServer同时将大量的请求
发给UpdateServer。
2)UpdateServer:定期合并过程中ChunkServer需要从UpdateServer读取大量的数
据,为了防止定期合并任务用满带宽而阻塞用户的正常请求,UpdateServer将任务区
分为高优先级(用户正常请求)和低优先级(定期合并任务),并单独统计每种任
务的输出带宽。如果低优先级任务的输出带宽超过上限,降低低优先级任务的处理
速度;反之,适当提高低优先级任务的处理速度。
如果OceanBase部署了两个集群,还能够支持主备集群在不同时间段进行“错峰
合并”:一个集群执行定期合并时,把全部或大部分读写流量切到另一个集群,该集
群合并完成后,把全部或大部分流量切回,以便另一个集群接着进行定期合并。两
个集群都合并完成后,恢复正常的流量分配。


页: [1]
查看完整版本: 《大规模分布式存储系统》第9章 分布式存储引擎【9.4】