操作 FAQ

为什么我启用多进程程序失败了?

当前 Milvus 在运行过程中,能够实现多进程操作,但在实现时需满足一定条件:

  • 程序执行时主进程中无 client
  • 每个子进程分别创建 client 进行操作

以下为正确程序的示例(部分细节如 import multiprocessing as mp 需自行修改)。

当表名为 TABLE_NAME,且已插入 vector_1 的表存在时,直接在主程序中直接调用该函数,两个 insert 进程和一个 search 进程同时执行,且能获得正确结果。其中需注意的是,search 的结果与当前正在 insert 的向量无关。

  1. def test_add_vector_search_multiprocessing():
  2. '''
  3. target: test add vectors, and search it with multiprocessing
  4. method: set vectors_1[0] as query vectors
  5. expected: status ok and result length is 1
  6. '''
  7. nq = 1000
  8. vectors_1 = gen_vec_list(nq)
  9. vectors_2 = gen_vec_list(nq)
  10. def add_vectors_search(idx):
  11. if idx == 0:
  12. MILVUS = Milvus()
  13. connect_server(MILVUS)
  14. status = add_vec_to_milvus(MILVUS, vectors_1)
  15. print("add", i, "finished")
  16. assert status.OK()
  17. elif idx == 1:
  18. MILVUS = Milvus()
  19. connect_server(MILVUS)
  20. status, result = MILVUS.search_vectors(TABLE_NAME, 1, NPROBE, [vectors_1[0]])
  21. print(result)
  22. assert status.OK()
  23. assert len(result) == 1
  24. else:
  25. MILVUS = Milvus()
  26. connect_server(MILVUS)
  27. status = add_vec_to_milvus(MILVUS, vectors_2)
  28. print("add", i, "finished")
  29. assert status.OK()
  30. process_num = 3
  31. processes = []
  32. for i in range(process_num):
  33. p = mp.Process(target=add_vectors_search, args=(i,))
  34. processes.append(p)
  35. p.start()
  36. print("process", i)
  37. for p in processes:
  38. p.join()

而若主进程中已存在 client(如利用 client 进行建表及插入),再进行多进程的操作,则会造成 client hang,最终导致 timeout。产生该结果的错误程序示例如下所示。

其中 connect 即为主进程所起 client,程序将会持续执行,直至 timeout。

  1. def test_add_vector_search_multiprocessing(self, connect, table):
  2. '''
  3. target: test add vectors, and search it with multiprocessing
  4. method: set vectors_1[0] as query vectors
  5. expected: status ok and result length is 1
  6. '''
  7. nq = 5
  8. vectors_1 = gen_vectors(nq, dim)
  9. vectors_2 = gen_vectors(nq, dim)
  10. status, ids = connect.add_vectors(table, vectors_1)
  11. time.sleep(3)
  12. status, count = connect.get_table_row_count(table)
  13. assert count == 5
  14. def add_vectors_search(connect, idx):
  15. if (idx % 2) == 0:
  16. status, ids = connect.add_vectors(table, vectors_2)
  17. assert status.OK()
  18. else:
  19. status, result = connect.search_vectors(table, 1, [vectors_1[0]])
  20. assert status.OK()
  21. assert len(result) == 1
  22. process_num = 3
  23. processes = []
  24. for i in range(process_num):
  25. p = Process(target=add_vectors_search, args=(connect, i))
  26. processes.append(p)
  27. p.start()
  28. for p in processes:
  29. p.join()

为什么搜索 top K 的向量,结果不到 K 条向量?

在 Milvus 支持的索引类型中,IVFLATIVF_SQ8 是基于 k-means 空间划分的分桶搜索算法。空间被分为 nlist 个桶,导入的向量被分配存储在基于 nlist 划分的文件结构中。搜索发生时,只搜索最近似的 nprobe 个文件。

如果 nlist 和 K 比较大,而 nprobe 又足够小时,有可能出现 nprobe 文件中的所有向量总数小于 K。当你搜索 top K 向量时,就会出现搜索结果小于 K 条向量的情况。

想要避免这种情况,您可以尝试将 nprobe 设置为更大值,或是把 nlist 和 K 设置小一点。

相关阅读

产品 FAQ