Elasticsearch Delete/UpdateByQuery案例

bboss

Elasticsearch Delete/UpdateByQuery案例分享

本文涉及技术点:

  1. DeleteByQuery/UpdateByQuery
  2. Count文档统计Api

1.准备工作

参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置es客户端

2.编写创建索引表和初始化数据方法

创建索引表和初始化数据的组件DocumentCRUD 实现本文不做重点介绍,请访问视频教程了解:

  1. /**
  2. * 创建索引表并导入测试数据
  3. */
  4. public void initIndiceAndData(){
  5. DocumentCRUD documentCRUD = new DocumentCRUD();
  6. documentCRUD.testCreateIndice();//创建索引表
  7. documentCRUD.testBulkAddDocument();//导入测试数据
  8. }

3.定义DeleteByQuery/UpdateByQuery对应的Dsl脚本

新建配置文件-esmapper/byquery.xml

  1. <properties>
  2. <!--
  3. updateByquery
  4. deleteByquery
  5. dsl配置之文件
  6. -->
  7. <property name="updateByQuery">
  8. <![CDATA[
  9. {
  10. "query": {
  11. "bool": {
  12. "filter": [
  13. { ## 多值检索,查找多个应用名称对应的文档记录
  14. "terms": {
  15. "applicationName.keyword": [#[applicationName1],#[applicationName2]]
  16. }
  17. },
  18. { ## 时间范围检索,返回对应时间范围内的记录,接受long型的值
  19. "range": {
  20. "agentStarttime": {
  21. "gte": #[startTime],##统计开始时间
  22. "lt": #[endTime] ##统计截止时间
  23. }
  24. }
  25. }
  26. ]
  27. }
  28. }
  29. }
  30. ]]>
  31. </property>
  32. <property name="deleteByQuery">
  33. <![CDATA[
  34. {
  35. "query": {
  36. "bool": {
  37. "filter": [
  38. { ## 多值检索,查找多个应用名称对应的文档记录
  39. "terms": {
  40. "applicationName.keyword": [#[applicationName1],#[applicationName2]]
  41. }
  42. },
  43. { ## 时间范围检索,返回对应时间范围内的记录,接受long型的值
  44. "range": {
  45. "agentStarttime": {
  46. "gte": #[startTime],##统计开始时间
  47. "lt": #[endTime] ##统计截止时间
  48. }
  49. }
  50. }
  51. ]
  52. }
  53. }
  54. }
  55. ]]>
  56. </property>
  57. </properties>

4.实现DeleteByQuery功能

定义实现DeleteByQuery功能的方法

  1. /**
  2. * 验证DeleteByQuery功能
  3. * @throws ParseException
  4. */
  5. public void deleteByQuery() throws ParseException {
  6. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/byquery.xml");
  7. //设定DeleteByQuery查询条件,通过map传递变量参数值,key对于dsl中的变量名称
  8. //dsl中有四个变量
  9. // applicationName1
  10. // applicationName2
  11. // startTime
  12. // endTime
  13. Map<String,Object> params = new HashMap<String,Object>();
  14. //设置applicationName1和applicationName2两个变量的值
  15. params.put("applicationName1","blackcatdemo2");
  16. params.put("applicationName2","blackcatdemo3");
  17. DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  18. //设置时间范围,时间参数接受long值
  19. params.put("startTime",dateFormat.parse("2017-09-02 00:00:00").getTime());
  20. params.put("endTime",new Date().getTime());
  21. //通过count api先查询数据是否存在
  22. long totalSize = clientUtil.count("demo","deleteByQuery",params);
  23. System.out.println("---------------------------------删除前查询,验证数据是否存在:totalSize="+totalSize);
  24. if(totalSize > 0) {//如果有数据,则通过by query删除之,为了实时查看删除效果,启用了强制刷新机制
  25. //执行DeleteByQuery操作
  26. String result = clientUtil.deleteByQuery("demo/_delete_by_query?refresh", "deleteByQuery", params);
  27. System.out.println(result);
  28. //删除后再次查询,验证数据是否被删除成功
  29. totalSize = clientUtil.count("demo","deleteByQuery",params);
  30. System.out.println("---------------------------------删除后再次查询,验证数据是否被删除:totalSize="+totalSize);
  31. }
  32. }

5.执行DeleteByQuery junit单元测试方法

  1. @Test
  2. public void testDeleteByQuery() throws ParseException {
  3. DeleteUdateByQuery deleteUdateByQuery = new DeleteUdateByQuery();
  4. deleteUdateByQuery.initIndiceAndData();//初始化索引表和测试数据
  5. deleteUdateByQuery.deleteByQuery();//执行deleteByquery
  6. }

6.定义updatebyquery方法

定义两个updatebyquery方法,一个带query dsl,一个不带query dsl:

  1. /**
  2. * 验证simpleUpdateByQuery功能
  3. */
  4. public void simpleUpdateByQuery(){
  5. ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
  6. String result = clientUtil.updateByQuery("demo/_update_by_query?conflicts=proceed");
  7. System.out.println("******************simpleUpdateByQuery result:"+result);
  8. }
  9. /**
  10. * 验证带查询条件UpdateByQuery功能
  11. * @throws ParseException
  12. */
  13. public void updateByQueryWithCondition() throws ParseException {
  14. ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/byquery.xml");
  15. //设定查询条件,通过map传递变量参数值,key对于dsl中的变量名称
  16. //dsl中有四个变量
  17. // applicationName1
  18. // applicationName2
  19. // startTime
  20. // endTime
  21. Map<String,Object> params = new HashMap<String,Object>();
  22. //设置applicationName1和applicationName2两个变量的值
  23. params.put("applicationName1","blackcatdemo2");
  24. params.put("applicationName2","blackcatdemo3");
  25. DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  26. //设置时间范围,时间参数接受long值
  27. params.put("startTime",dateFormat.parse("2017-09-02 00:00:00").getTime());
  28. params.put("endTime",new Date().getTime());
  29. String result = clientUtil.updateByQuery("demo/_update_by_query?conflicts=proceed","updateByQuery",params);
  30. System.out.println("******************updateByQueryWithCondition result:"+result);
  31. }

7.执行updatebyquery junit单元测试方法

  1. @Test
  2. public void testUpdateByQuery() throws ParseException {
  3. DeleteUdateByQuery deleteUdateByQuery = new DeleteUdateByQuery();
  4. deleteUdateByQuery.initIndiceAndData();//初始化索引表和测试数据
  5. deleteUdateByQuery.simpleUpdateByQuery();
  6. deleteUdateByQuery.updateByQueryWithCondition();
  7. }

8.参考文档

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html

本文示例代码对应的源码工程地址:

https://gitee.com/bboss/eshelloword

本文对应的java文件:

https://gitee.com/bboss/eshelloword/blob/master/src/main/java/org/bboss/elasticsearchtest/byquery/DeleteUdateByQuery.java

https://gitee.com/bboss/eshelloword/blob/master/src/test/java/org/bboss/elasticsearchtest/crud/DeleteUdateByQueryTest.java

本文对应的dsl配置文件:

https://gitee.com/bboss/eshelloword/blob/master/src/main/resources/esmapper/byquery.xml