最佳实践

流量控制

经常有业务有集中灌数据的场景,灌数据的过程可能是单机的也可能是分布式的,譬如使用Spark处理后将数据写入Pegasus中。

如果不做流控,很可能产生很高的QPS峰值写,对Pegasus系统造成较大压力:

  • 写QPS太大,会影响读性能,造成读操作延迟上升;
  • 写QPS太大,可能会造成集群无法承受压力而停止服务;因此,强烈建议业务方在灌数据的时候对写QPS进行流量控制。

客户端流控的思路就是:

  • 首先定好总的QPS限制是多少(譬如10000/s),有多少个并发的客户端访问线程(譬如50个),然后计算出每个线程的QPS限制(譬如10000/50=200)。
  • 对于单个客户端线程,通过流控工具将QPS限制在期望的范围内。如果超过了QPS限制,就采用简单的sleep方式来等待。我们提供了一个流控工具类com.xiaomi.infra.pegasus.tools.FlowController,把计算QPS和执行sleep的逻辑封装起来,方便用户使用。FlowController用法:

  • 构造函数接受一个QPS参数,用于指定流量限制,譬如单线程QPS只允许200/s,就传入200;

  • 用户在每次需要执行写操作之前调用cntl.getToken()方法,该方法产生两种可能:
    • 如果当前未达到流量控制,则无阻塞直接返回,继续执行后面的写操作;
    • 如果当前已经达到流量限制,则该方法会阻塞(sleep)一段时间才返回,以达到控制流量的效果。
  • 该工具尽量配合同步接口使用,对于异步接口可能效果没那么好。使用方法很简单:
  1. FlowController cntl = new FlowController(qps);
  2. while (true) {
  3. // call getToken before operation
  4. cntl.getToken();
  5. client.set(...);
  6. }
  7. cntl.stop();

在分布式灌数据的场景下,用户可以先确定分布式的Task并发数,然后通过总QPS限制 / Task并发数,得到单个Task的QPS限制,再使用FlowController进行控制。

分页查询

有时候业务需要分页查询功能,类似实现前端的分页功能。典型地,一个HashKey下有很多SortKey,一页只显示固定数量的SortKey,下一页时再显示接下来的固定数量的SortKey。

分页查询在Pegasus下有多种实现方式:

  • 一次性获取HaskKey下的全部数据,在业务端缓存下来,由业务端自己实现分页逻辑。
  • 实现顺序分页,可以使用multiGet()getScanner()方法,这两者都支持SortKey的范围查询:
    • 查第一页:
      • startSortKey = null
      • startInclusive = true
      • stopSortKey = null
      • stopInclusive = false
      • maxFetchCount = countPerPage
    • 通过判断返回值(同步接口)或者allFetched(异步接口)可以知道是否还有更多数据。如果还有更多数据,则查下一页。
    • 查下一页:记录当前页的最大SortKey(假设为maxSortKey),通过MultiGetOptions或者ScanOptions指定:
      • startSortKey = maxSortKey
      • startInclusive = false
      • stopSortKey = null
      • stopInclusive = false
      • maxFetchCount = countPerPage
  • 实现逆序分页,请使用multiGet()方法,其支持SortKey的逆序查询:
    • 查第一页:
      • startSortKey = null
      • startInclusive = true
      • stopSortKey = null
      • stopInclusive = false
      • maxFetchCount = countPerPage
      • reverse = true
    • 通过判断返回值(同步接口)或者allFetched(异步接口)可以知道是否还有更多数据。如果还有更多数据,则查下一页。
    • 查下一页:记录当前页的最小SortKey(假设为minSortKey)。如果还有更多数据,则获取下一页数据,可以在MultiGetOptions中指定:
      • startSortKey = null
      • startInclusive = true
      • stopSortKey = minSortKey
      • stopInclusive = false
      • maxFetchCount = countPerPage
      • reverse = true

数据序列化

Pegasus的key和value都是原始的字节串(Java中就是byte[]),而用户存储数据一般用struct或者class。因此,在将数据存储到Pegasus中时,需要将用户数据转化为字节串,这就是序列化;在从Pegasus中读取数据时,又需要将字节串转化为用户的数据结构,这就是反序列化。序列化和反序列化通常都是成对出现了,后面我们只描述序列化。

通常序列化有这些方式:

  • json:好处是数据可读性好;坏处是比较占空间。不推荐。
  • thrift:提供了多种Compact协议,常见的有binary协议。但是推荐用tcompact协议,因为这种协议的压缩率更高。
  • protobuf:与thrift类似,推荐序列化为binary格式。对于Thrift结构,使用tcompact协议进行序列化的样例:
  1. import org.apache.thrift.TSerializer;
  2. import org.apache.thrift.protocol.TCompactProtocol;
  3.  
  4. TSerializer serializer = new TSerializer(new TCompactProtocol.Factory());
  5. byte[] bytes = serializer.serialize(data);

数据压缩

对于value较大(>=2kb)的业务,我们推荐在客户端使用facebook/Zstandard压缩算法(简称 Zstd)对数据进行压缩,以减少value的数据长度,提升Pegasus的服务稳定性和读写性能。Zstd算法在压缩比和压缩速率上取得较好的平衡,适合通用场景。

从版本1.11.3-thrift-0.11.0-inlined-release开始,我们提供了Zstd压缩工具类com.xiaomi.infra.pegasus.tools.ZstdWrapper,方便用户实现压缩功能。

使用示例:

  1. byte[] value = "xxx";
  2.  
  3. // write the record into pegasus
  4. table.set("h".getBytes(), "s".getBytes(), ZstdWrapper.compress(value), 1000);
  5.  
  6. // read the record from pegasus
  7. byte[] compressedBuf = table.get("h".getBytes(), "s".getBytes(), 1000);
  8.  
  9. // decompress the value
  10. byte[] orginalValue = ZstdWrapper.decompress(compressedBuf);

也可以参考测试用例代码 TestZstdWrapper.java

以上两个优化 数据序列化数据压缩 可以在客户端同时使用,都是用客户端的CPU换取Pegasus集群的稳定性和读写性能。在通常情况下这都是值得的。

有时候,业务方在开始使用Pegasus的时候,没有采用客户端压缩,但是在使用一段时间后,发现单条数据的value比较大,希望能通过压缩的办法改进性能。可以分两步:

  • 评估压缩收益:评估通过客户端压缩是否能够获得足够好的压缩率。
  • 使用兼容性压缩:升级业务端使用Pegasus Java客户端的逻辑,增加客户端压缩支持,同时兼容原来未压缩的数据。

评估压缩收益

对于已经存在的表,原来没有采用客户端压缩,如何快速评估采用客户端压缩后有多大收益?

原料:

  • 业务集群:user_cluster,meta配置地址为${user_cluster_meta_list},其中用户表为user_table。
  • 测试集群:test_cluster,meta配置地址为${test_cluster_meta_list}
  • Shell工具:使用1.11.3及以上版本;修改配置文件src/shell/config.ini,添加访问test_cluster集群的配置项。
  • Java客户端工具:使用1.11.4及以上版本;修改配置文件pegasus.properties,设置meta_servers = ${test_cluster_meta_list}。步骤:

  • 使用Shell工具的create命令,在test_cluster集群中新建测试表user_table_no_compress和user_table_zstd_compress:

  1. ./run.sh shell --cluster ${test_cluster_meta_list}
  2. >>> create user_table_no_compress -p 8 -r 3
  3. >>> create user_table_zstd_compress -p 8 -r 3
  • 使用Shell工具的copy_data命令,将业务集群的user_table表的部分数据复制到测试集群的user_table_no_compress表中(在复制足够条数后通过Ctrl-C中断执行):
  1. ./run.sh shell --cluster ${user_cluster_meta_list}
  2. >>> use user_table
  3. >>> copy_data -c test_cluster -a user_table_no_compress
  • 使用Java客户端工具的copy_data命令,将测试集群user_table_no_compress表的数据复制到user_table_zstd_compress表中,并设置数据写出时采用zstd压缩:
  1. ./PegasusCli file://./pegasus.properties user_table_no_compress \
  2. copy_data file://./pegasus.properties user_table_zstd_compress none zstd
  • 使用Shell工具的count_data命令,分别统计两个测试表的数据大小,然后计算压缩率:
  1. ./run.sh shell --cluster ${test_cluster_meta_list}
  2. >>> use user_table_no_compress
  3. >>> count_data -a
  4. >>> use user_table_zstd_compress
  5. >>> count_data -a

使用兼容性压缩

业务表原来已经有未压缩的数据,如果应用了客户端压缩,写入新的已压缩的数据,但是hashKey和sortKey保持不变,就会出现未压缩数据和已压缩数据混合存在的情况:有的value存储的是未压缩的数据,有的value存储的是已压缩的数据。

这就要求业务端在读数据的时候保证兼容性:既能读取未压缩的数据,又能读取已压缩的数据。

基于未压缩的数据采用zstd进行解压缩时基本都会失败这一事实,业务端读取的逻辑可以这样:

  • 首先,尝试将客户端读到的value数据进行解压缩,如果成功,则说明是已压缩的数据。
  • 如果上一步解压缩失败,则说明读到的是未压缩的数据,不需要解压。示例代码:
  1. // decompress the value
  2. byte[] decompressedValue = null;
  3. try {
  4. decompressedValue = ZstdWrapper.decompress(value);
  5. } catch (PException e) {
  6. // decompress fail
  7. decompressedValue = value;
  8. }

与此同时,可以使用后台工具将未压缩数据逐渐替换掉为已压缩数据,并在替换过程中保证数据的一致性:扫描表,逐条读取数据,如果数据是未压缩的,则将其转换为已压缩的,使用check_and_set原子操作进行数据替换。