Elasticsearch Scroll和Slice Scroll查询API使用案例

bboss

Elasticsearch Scroll和Slice Scroll查询API使用案例

the best elasticsearch highlevel java rest api——-bboss

本文内容

  1. 基本scroll api使用
  2. 基本scroll api与自定义scorll结果集handler函数结合使用
  3. slice api使用(并行/串行)
  4. slice api使用与自定义scorll结果集handler函数结合使用(并行/串行)

本文对应的maven源码工程:

https://gitee.com/bboss/eshelloword-booter

1.dsl配置文件定义

首先定义本文需要的dsl配置文件

esmapper/scroll.xml

  1. <properties>
  2. <!--
  3. 简单的scroll query案例,复杂的条件修改query dsl即可
  4. -->
  5. <property name="scrollQuery">
  6. <![CDATA[
  7. {
  8. "size":#[size],
  9. "query": {"match_all": {}}
  10. }
  11. ]]>
  12. </property>
  13. <!--
  14. 简单的slice scroll query案例,复杂的条件修改query dsl即可
  15. -->
  16. <property name="scrollSliceQuery">
  17. <![CDATA[
  18. {
  19. "slice": {
  20. "id": #[sliceId], ## 必须使用sliceId作为变量名称
  21. "max": #[sliceMax] ## 必须使用sliceMax作为变量名称
  22. },
  23. "size":#[size],
  24. "query": {"match_all": {}}
  25. }
  26. ]]>
  27. </property>
  28. </properties>

下面介绍scroll各种用法,对应的测试类文件为:TestScrollAPIQuery

2.基本scroll api使用

  1. @Test
  2. public void testSimleScrollAPI(){
  3. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  4. //scroll分页检索
  5. Map params = new HashMap();
  6. params.put("size", 10000);//每页10000条记录
  7. //scroll上下文有效期1分钟,每次scroll检索的结果都会合并到总得结果集中;数据量大时存在oom内存溢出风险,大数据量时可以采用handler函数来处理每次scroll检索的结果(后面介绍)
  8. ESDatas<Map> response = clientUtil.scroll("demo/_search","scrollQuery","1m",params,Map.class);
  9. List<Map> datas = response.getDatas();
  10. long realTotalSize = datas.size();
  11. long totalSize = response.getTotalSize();
  12. System.out.println("totalSize:"+totalSize);
  13. System.out.println("realTotalSize:"+realTotalSize);
  14. System.out.println("countAll:"+clientUtil.countAll("demo"));
  15. }

3.基本scroll api与自定义scorll结果集handler函数结合使用

  1. @Test
  2. public void testSimleScrollAPIHandler(){
  3. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  4. //scroll分页检索
  5. Map params = new HashMap();
  6. params.put("size", 5000);//每页5000条记录
  7. //采用自定义handler函数处理每个scroll的结果集后,response中只会包含总记录数,不会包含记录集合
  8. //scroll上下文有效期1分钟;大数据量时可以采用handler函数来处理每次scroll检索的结果,规避数据量大时存在的oom内存溢出风险
  9. ESDatas<Map> response = clientUtil.scroll("demo/_search", "scrollQuery", "1m", params, Map.class, new ScrollHandler<Map>() {
  10. public void handle(ESDatas<Map> response, HandlerInfo handlerInfo)) throws Exception {//自己处理每次scroll的结果
  11. List<Map> datas = response.getDatas();
  12. long totalSize = response.getTotalSize();
  13. System.out.println("totalSize:"+totalSize+",datas.size:"+datas.size());
  14. }
  15. });
  16. System.out.println("response realzie:"+response.getTotalSize());
  17. }

4.slice api使用

串行

  1. /**
  2. * 串行方式执行slice scroll操作
  3. */
  4. @Test
  5. public void testSimpleSliceScrollApi() {
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  7. //scroll slice分页检索,max对应并行度,一般设置为与索引表的shards数一致
  8. int max = 6;
  9. Map params = new HashMap();
  10. params.put("sliceMax", max);//建议不要大于索引表的shards数
  11. params.put("size", 100);//每页100条记录
  12. //scroll上下文有效期1分钟,每次scroll检索的结果都会合并到总得结果集中;数据量大时存在oom内存溢出风险,大数据量时可以采用handler函数来处理每次slice scroll检索的结果(后面介绍)
  13. ESDatas<Map> sliceResponse = clientUtil.scrollSlice("demo/_search",
  14. "scrollSliceQuery", params,"1m",Map.class);//串行;如果数据量大,建议采用并行方式来执行
  15. System.out.println("totalSize:"+sliceResponse.getTotalSize());
  16. System.out.println("realSize size:"+sliceResponse.getDatas().size());
  17. }

并行

  1. /**
  2. * 并行方式执行slice scroll操作
  3. */
  4. @Test
  5. public void testSimpleSliceScrollApiParral() {
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  7. //scroll slice分页检索,max对应并行度,一般设置为与索引表的shards数一致
  8. int max = 6;
  9. Map params = new HashMap();
  10. params.put("sliceMax", max);//这里设置6个slice,建议不要大于索引表的shards数,必须使用sliceMax作为变量名称
  11. params.put("size", 100);//每页100条记录
  12. //scroll上下文有效期2分钟,每次scroll检索的结果都会合并到总得结果集中;数据量大时存在oom内存溢出风险,大数据量时可以采用handler函数来处理每次scroll检索的结果(后面介绍)
  13. ESDatas<Map> sliceResponse = clientUtil.scrollSliceParallel("demo/_search",
  14. "scrollSliceQuery", params,"2m",Map.class);//表示并行,会从slice scroll线程池中申请sliceMax个线程来并行执行slice scroll检索操作,大数据量多个shared分片的情况下建议采用并行模式
  15. System.out.println("totalSize:"+sliceResponse.getTotalSize());
  16. System.out.println("realSize size:"+sliceResponse.getDatas().size());
  17. }

5.slice api使用与自定义scorll结果集handler函数结合使用

串行

  1. /**
  2. * 串行方式执行slice scroll操作
  3. */
  4. @Test
  5. public void testSimpleSliceScrollApiHandler() {
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  7. //scroll slice分页检索,max对应并行度,一般设置为与索引表的shards数一致
  8. int max = 6;
  9. Map params = new HashMap();
  10. params.put("sliceMax", max);//这里设置6个slice,建议不要大于索引表的shards数,必须使用sliceMax作为变量名称
  11. params.put("size", 1000);//每页1000条记录
  12. //采用自定义handler函数处理每个slice scroll的结果集后,sliceResponse中只会包含总记录数,不会包含记录集合
  13. //scroll上下文有效期1分钟,大数据量时可以采用handler函数来处理每次scroll检索的结果,规避数据量大时存在的oom内存溢出风险
  14. ESDatas<Map> sliceResponse = clientUtil.scrollSlice("demo/_search",
  15. "scrollSliceQuery", params,"1m",Map.class, new ScrollHandler<Map>() {
  16. public void handle(ESDatas<Map> response, HandlerInfo handlerInfo)) throws Exception {//自己处理每次scroll的结果
  17. List<Map> datas = response.getDatas();
  18. long totalSize = response.getTotalSize();
  19. System.out.println("totalSize:"+totalSize+",datas.size:"+datas.size());
  20. }
  21. });//串行,如果数据量大建议采用并行模式
  22. long totalSize = sliceResponse.getTotalSize();
  23. System.out.println("totalSize:"+totalSize);
  24. }

并行

  1. /**
  2. * 并行方式执行slice scroll操作
  3. */
  4. @Test
  5. public void testSimpleSliceScrollApiParralHandler() {
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/scroll.xml");
  7. //scroll slice分页检索,max对应并行度,一般设置为与索引表的shards数一致
  8. int max = 6;
  9. Map params = new HashMap();
  10. params.put("sliceMax", max);//这里设置6个slice,建议不要大于索引表的shards数,必须使用sliceMax作为变量名称
  11. params.put("size", 1000);//每页1000条记录
  12. //采用自定义handler函数处理每个slice scroll的结果集后,sliceResponse中只会包含总记录数,不会包含记录集合
  13. //scroll上下文有效期1分钟,大数据量时可以采用handler函数来处理每次scroll检索的结果,规避数据量大时存在的oom内存溢出风险
  14. ESDatas<Map> sliceResponse = clientUtil.scrollSliceParallel("demo/_search",
  15. "scrollSliceQuery", params,"1m",Map.class, new ScrollHandler<Map>() {
  16. public void handle(ESDatas<Map> response, HandlerInfo handlerInfo)) throws Exception {//自己处理每次scroll的结果,注意结果是异步检索的
  17. List<Map> datas = response.getDatas();
  18. long totalSize = response.getTotalSize();
  19. System.out.println("totalSize:"+totalSize+",datas.size:"+datas.size());
  20. }
  21. });//表示并行,会从slice scroll线程池中申请sliceMax个线程来并行执行slice scroll检索操作,大数据量多个shared分片的情况下建议采用并行模式
  22. long totalSize = sliceResponse.getTotalSize();
  23. System.out.println("totalSize:"+totalSize);
  24. }

我们可以在application.properties文件中增加以下配置来设置slice scroll查询线程池线程数和等待队列长度:

elasticsearch.sliceScrollThreadCount 默认值500

elasticsearch.sliceScrollThreadQueue 默认值500

6 开发交流

elasticsearch技术交流:166471282

elasticsearch:

bbossgroups