最佳实践
流量控制
经常有业务有集中灌数据的场景,灌数据的过程可能是单机的也可能是分布式的,譬如使用Spark处理后将数据写入Pegasus中。
如果不做流控,很可能产生很高的QPS峰值写,对Pegasus系统造成较大压力:
- 写QPS太大,会影响读性能,造成读操作延迟上升;
- 写QPS太大,可能会造成集群无法承受压力而停止服务;因此,强烈建议业务方在灌数据的时候对写QPS进行流量控制。
客户端流控的思路就是:
- 首先定好总的QPS限制是多少(譬如10000/s),有多少个并发的客户端访问线程(譬如50个),然后计算出每个线程的QPS限制(譬如10000/50=200)。
对于单个客户端线程,通过流控工具将QPS限制在期望的范围内。如果超过了QPS限制,就采用简单的sleep方式来等待。我们提供了一个流控工具类com.xiaomi.infra.pegasus.tools.FlowController,把计算QPS和执行sleep的逻辑封装起来,方便用户使用。FlowController用法:
构造函数接受一个QPS参数,用于指定流量限制,譬如单线程QPS只允许200/s,就传入200;
- 用户在每次需要执行写操作之前调用cntl.getToken()方法,该方法产生两种可能:
- 如果当前未达到流量控制,则无阻塞直接返回,继续执行后面的写操作;
- 如果当前已经达到流量限制,则该方法会阻塞(sleep)一段时间才返回,以达到控制流量的效果。
- 该工具尽量配合同步接口使用,对于异步接口可能效果没那么好。使用方法很简单:
- FlowController cntl = new FlowController(qps);
- while (true) {
- // call getToken before operation
- cntl.getToken();
- client.set(...);
- }
- cntl.stop();
在分布式灌数据的场景下,用户可以先确定分布式的Task并发数,然后通过总QPS限制 / Task并发数
,得到单个Task的QPS限制,再使用FlowController进行控制。
分页查询
有时候业务需要分页查询功能,类似实现前端的分页功能。典型地,一个HashKey下有很多SortKey,一页只显示固定数量的SortKey,下一页时再显示接下来的固定数量的SortKey。
分页查询在Pegasus下有多种实现方式:
- 一次性获取HaskKey下的全部数据,在业务端缓存下来,由业务端自己实现分页逻辑。
- 实现顺序分页,可以使用multiGet()和getScanner()方法,这两者都支持SortKey的范围查询:
- 查第一页:
- startSortKey = null
- startInclusive = true
- stopSortKey = null
- stopInclusive = false
- maxFetchCount = countPerPage
- 通过判断返回值(同步接口)或者allFetched(异步接口)可以知道是否还有更多数据。如果还有更多数据,则查下一页。
- 查下一页:记录当前页的最大SortKey(假设为maxSortKey),通过MultiGetOptions或者ScanOptions指定:
- startSortKey = maxSortKey
- startInclusive = false
- stopSortKey = null
- stopInclusive = false
- maxFetchCount = countPerPage
- 查第一页:
- 实现逆序分页,请使用multiGet()方法,其支持SortKey的逆序查询:
- 查第一页:
- startSortKey = null
- startInclusive = true
- stopSortKey = null
- stopInclusive = false
- maxFetchCount = countPerPage
- reverse = true
- 通过判断返回值(同步接口)或者allFetched(异步接口)可以知道是否还有更多数据。如果还有更多数据,则查下一页。
- 查下一页:记录当前页的最小SortKey(假设为minSortKey)。如果还有更多数据,则获取下一页数据,可以在MultiGetOptions中指定:
- startSortKey = null
- startInclusive = true
- stopSortKey = minSortKey
- stopInclusive = false
- maxFetchCount = countPerPage
- reverse = true
- 查第一页:
数据序列化
Pegasus的key和value都是原始的字节串(Java中就是byte[]),而用户存储数据一般用struct或者class。因此,在将数据存储到Pegasus中时,需要将用户数据转化为字节串,这就是序列化;在从Pegasus中读取数据时,又需要将字节串转化为用户的数据结构,这就是反序列化。序列化和反序列化通常都是成对出现了,后面我们只描述序列化。
通常序列化有这些方式:
- json:好处是数据可读性好;坏处是比较占空间。不推荐。
- thrift:提供了多种Compact协议,常见的有binary协议。但是推荐用tcompact协议,因为这种协议的压缩率更高。
- protobuf:与thrift类似,推荐序列化为binary格式。对于Thrift结构,使用tcompact协议进行序列化的样例:
- import org.apache.thrift.TSerializer;
- import org.apache.thrift.protocol.TCompactProtocol;
- TSerializer serializer = new TSerializer(new TCompactProtocol.Factory());
- byte[] bytes = serializer.serialize(data);
数据压缩
对于value较大(>=2kb)的业务,我们推荐在客户端使用facebook/Zstandard压缩算法(简称 Zstd)对数据进行压缩,以减少value的数据长度,提升Pegasus的服务稳定性和读写性能。Zstd算法在压缩比和压缩速率上取得较好的平衡,适合通用场景。
从版本1.11.3-thrift-0.11.0-inlined-release开始,我们提供了Zstd压缩工具类com.xiaomi.infra.pegasus.tools.ZstdWrapper,方便用户实现压缩功能。
使用示例:
- byte[] value = "xxx";
- // write the record into pegasus
- table.set("h".getBytes(), "s".getBytes(), ZstdWrapper.compress(value), 1000);
- // read the record from pegasus
- byte[] compressedBuf = table.get("h".getBytes(), "s".getBytes(), 1000);
- // decompress the value
- byte[] orginalValue = ZstdWrapper.decompress(compressedBuf);
也可以参考测试用例代码 TestZstdWrapper.java。
以上两个优化 数据序列化 和 数据压缩 可以在客户端同时使用,都是用客户端的CPU换取Pegasus集群的稳定性和读写性能。在通常情况下这都是值得的。
有时候,业务方在开始使用Pegasus的时候,没有采用客户端压缩,但是在使用一段时间后,发现单条数据的value比较大,希望能通过压缩的办法改进性能。可以分两步:
评估压缩收益
对于已经存在的表,原来没有采用客户端压缩,如何快速评估采用客户端压缩后有多大收益?
原料:
- 业务集群:user_cluster,meta配置地址为
${user_cluster_meta_list}
,其中用户表为user_table。 - 测试集群:test_cluster,meta配置地址为
${test_cluster_meta_list}
。 - Shell工具:使用1.11.3及以上版本;修改配置文件
src/shell/config.ini
,添加访问test_cluster集群的配置项。 Java客户端工具:使用1.11.4及以上版本;修改配置文件
pegasus.properties
,设置meta_servers = ${test_cluster_meta_list}
。步骤:使用Shell工具的create命令,在test_cluster集群中新建测试表user_table_no_compress和user_table_zstd_compress:
./run.sh shell --cluster ${test_cluster_meta_list}
>>> create user_table_no_compress -p 8 -r 3
>>> create user_table_zstd_compress -p 8 -r 3
- 使用Shell工具的copy_data命令,将业务集群的user_table表的部分数据复制到测试集群的user_table_no_compress表中(在复制足够条数后通过Ctrl-C中断执行):
./run.sh shell --cluster ${user_cluster_meta_list}
>>> use user_table
>>> copy_data -c test_cluster -a user_table_no_compress
- 使用Java客户端工具的copy_data命令,将测试集群user_table_no_compress表的数据复制到user_table_zstd_compress表中,并设置数据写出时采用zstd压缩:
./PegasusCli file://./pegasus.properties user_table_no_compress \
copy_data file://./pegasus.properties user_table_zstd_compress none zstd
- 使用Shell工具的count_data命令,分别统计两个测试表的数据大小,然后计算压缩率:
./run.sh shell --cluster ${test_cluster_meta_list}
>>> use user_table_no_compress
>>> count_data -a
>>> use user_table_zstd_compress
>>> count_data -a
使用兼容性压缩
业务表原来已经有未压缩的数据,如果应用了客户端压缩,写入新的已压缩的数据,但是hashKey和sortKey保持不变,就会出现未压缩数据和已压缩数据混合存在的情况:有的value存储的是未压缩的数据,有的value存储的是已压缩的数据。
这就要求业务端在读数据的时候保证兼容性:既能读取未压缩的数据,又能读取已压缩的数据。
基于未压缩的数据采用zstd进行解压缩时基本都会失败这一事实,业务端读取的逻辑可以这样:
- 首先,尝试将客户端读到的value数据进行解压缩,如果成功,则说明是已压缩的数据。
- 如果上一步解压缩失败,则说明读到的是未压缩的数据,不需要解压。示例代码:
- // decompress the value
- byte[] decompressedValue = null;
- try {
- decompressedValue = ZstdWrapper.decompress(value);
- } catch (PException e) {
- // decompress fail
- decompressedValue = value;
- }
与此同时,可以使用后台工具将未压缩数据逐渐替换掉为已压缩数据,并在替换过程中保证数据的一致性:扫描表,逐条读取数据,如果数据是未压缩的,则将其转换为已压缩的,使用check_and_set原子操作进行数据替换。