所谓hot-index就是指在构建索引的过程中不会阻塞查询数据,也不会阻塞修改数据(insert/update/delete)。在TokuDB的实现中只有使用“create index“方式创建索引的情况下才能使用hot-index;如果使用“alter table add index”是会阻塞更新操作的。

TokuDB handler的ha_tokudb::store_lock判断是create index方式创建索引并且只创建一个索引会把lock_type改成TL_WRITE_ALLOW_WRITE,这是一个特殊的锁类型,意思是在执行写操作的过程允许其他的写操作。


  1. THR_LOCK_DATA* *ha_tokudb::store_lock(
  2. THD* thd,
  3. THR_LOCK_DATA** to,
  4. enum thr_lock_type lock_type) {
  5. if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
  6. enum_sql_command sql_command = (enum_sql_command) thd_sql_command(thd);
  7. if (!thd->in_lock_tables) {
  8. if (sql_command == SQLCOM_CREATE_INDEX &&
  9. tokudb::sysvars::create_index_online(thd)) {
  10. // hot indexing
  11. share->_num_DBs_lock.lock_read();
  12. if (share->num_DBs == (table->s->keys + tokudb_test(hidden_primary_key))) {
  13. lock_type = TL_WRITE_ALLOW_WRITE;
  14. }
  15. share->_num_DBs_lock.unlock();
  16. } else if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
  17. lock_type <= TL_WRITE) &&
  18. sql_command != SQLCOM_TRUNCATE &&
  19. !thd_tablespace_op(thd)) {
  20. // allow concurrent writes
  21. lock_type = TL_WRITE_ALLOW_WRITE;
  22. } else if (sql_command == SQLCOM_OPTIMIZE &&
  23. lock_type == TL_READ_NO_INSERT) {
  24. // hot optimize table
  25. lock_type = TL_READ;
  26. }
  27. }
  28. lock.type = lock_type;
  29. }
  30. }




  • 只创建一个索引
  • 不是unique索引
  1. int ha_tokudb::tokudb_add_index(
  2. TABLE* table_arg,
  3. KEY* key_info,
  4. uint num_of_keys,
  5. DB_TXN* txn,
  6. bool* inc_num_DBs,
  7. bool* modified_DBs) {
  8. bool use_hot_index = (lock.type == TL_WRITE_ALLOW_WRITE);
  9. creating_hot_index =
  10. use_hot_index && num_of_keys == 1 &&
  11. (key_info[0].flags & HA_NOSAME) == 0;
  12. if (use_hot_index && (share->num_DBs > curr_num_DBs)) {
  13. //
  14. // already have hot index in progress, get out
  15. //
  16. error = HA_ERR_INTERNAL_ERROR;
  17. goto cleanup;
  18. }







  1. int ha_tokudb::tokudb_add_index(
  2. TABLE* table_arg,
  3. KEY* key_info,
  4. uint num_of_keys,
  5. DB_TXN* txn,
  6. bool* inc_num_DBs,
  7. bool* modified_DBs) {
  8. // 省略前面部分代码
  9. if (creating_hot_index) {
  10. share->num_DBs++;
  11. *inc_num_DBs = true;
  12. error = db_env->create_indexer(
  13. db_env,
  14. txn,
  15. &indexer,
  16. share->file,
  17. num_of_keys,
  18. &share->key_file[curr_num_DBs],
  19. mult_db_flags,
  20. indexer_flags);
  21. if (error) {
  22. goto cleanup;
  23. }
  24. error = indexer->set_poll_function(
  25. indexer, ha_tokudb::tokudb_add_index_poll, &lc);
  26. if (error) {
  27. goto cleanup;
  28. }
  29. error = indexer->set_error_callback(
  30. indexer, ha_tokudb::loader_add_index_err, &lc);
  31. if (error) {
  32. goto cleanup;
  33. }
  34. share->_num_DBs_lock.unlock();
  35. rw_lock_taken = false;
  37. // initialize a one phase progress report.
  38. // incremental reports are done in the indexer's callback function.
  39. thd_progress_init(thd, 1);
  40. #endif
  41. error = indexer->build(indexer);
  42. if (error) {
  43. goto cleanup;
  44. }
  45. share->_num_DBs_lock.lock_write();
  46. error = indexer->close(indexer);
  47. share->_num_DBs_lock.unlock();
  48. if (error) {
  49. goto cleanup;
  50. }
  51. indexer = NULL;
  52. }

build设计思想是通过遍历pk构造二级索引。在pk上创建一个le cursor,这个cursor特别之处是读取的是MVCC结构(即leafentry)而不是数据。Le cursor遍历的方向是从正无穷(最大的key值)向前访问,一直到负无穷(最小的key值)。通过Le cursor的key和value(从MVCC中得到的)构造二级索引的key;通过pk MVCC中的事务信息,构建二级索引的MVCC。




db_env->create_indexer其实就是toku_indexer_create_indexer,是在toku_env_create阶段设置的。 在create_indexer阶段,最主要工作就是初始化DB_INDEXER数据结构。

DB_INDEXER其实是一个接口类主要定义了build,close,abort等callback函数,其主体成员变量定义在struct __toku_indexer_internal里面。 DB_INDEXER定义如下:

  1. typedef struct __toku_indexer DB_INDEXER;
  2. struct __toku_indexer_internal;
  3. struct __toku_indexer {
  4. struct __toku_indexer_internal *i;
  5. int (*set_error_callback)(DB_INDEXER *indexer, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
  6. int (*set_poll_function)(DB_INDEXER *indexer, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
  7. int (*build)(DB_INDEXER *indexer); /* build the indexes */
  8. int (*close)(DB_INDEXER *indexer); /* finish indexing, free memory */
  9. int (*abort)(DB_INDEXER *indexer); /* abort indexing, free memory */
  10. };



  1. struct __toku_indexer_internal {
  2. DB_ENV *env;
  3. DB_TXN *txn;
  4. toku_mutex_t indexer_lock;
  5. toku_mutex_t indexer_estimate_lock;
  6. DBT position_estimate;
  7. DB *src_db;
  8. int N;
  9. DB **dest_dbs; /* [N] */
  10. uint32_t indexer_flags;
  11. void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra);
  12. void *error_extra;
  13. int (*poll_func)(void *poll_extra, float progress);
  14. void *poll_extra;
  15. uint64_t estimated_rows; // current estimate of table size
  16. uint64_t loop_mod; // how often to call poll_func
  17. LE_CURSOR lec;
  18. FILENUM *fnums; /* [N] */
  19. FILENUMS filenums;
  20. // undo state
  21. struct indexer_commit_keys commit_keys; // set of keys to commit
  22. DBT_ARRAY *hot_keys;
  23. DBT_ARRAY *hot_vals;
  24. // test functions
  25. int (*undo_do)(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule);
  26. TOKUTXN_STATE (*test_xid_state)(DB_INDEXER *indexer, TXNID xid);
  27. void (*test_lock_key)(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key);
  28. int (*test_delete_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
  29. int (*test_delete_committed)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
  30. int (*test_insert_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
  31. int (*test_insert_committed)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
  32. int (*test_commit_any)(DB_INDEXER *indexer, DB *db, DBT *key, XIDS xids);
  33. // test flags
  34. int test_only_flags;
  35. };


有一点需要提下,db_env->create_index调用toku_loader_create_loader创建一个dummy的索引。当build过程出错时,会放弃之前的所有操作,把索引重定向到那个dummy索引。这是利用loader redirect FT handle的功能,创建loader时指定LOADER_DISALLOW_PUTS标记。



build_index主体是一个循环,每次去pk上读取一个key。前面提到过,访问pk是通过le cursor,每次向前访问,读取key和MVCC信息,在cursor callback把相应信息填到ule_prov_info数据结构里。le cursor的callback是le_cursor_callback,通过txn_manager得到第一个uncommitted txn信息,然后在通过那个txn的txn_child_manager得到其他的uncommitted txn信息。


  1. struct ule_prov_info {
  2. // these are pointers to the allocated leafentry and ule needed to calculate
  3. // provisional info. we only borrow them - whoever created the provisional info
  4. // is responsible for cleaning up the leafentry and ule when done.
  5. LEAFENTRY le; //packed MVCC info
  6. ULEHANDLE ule; //unpacked MVCC info
  7. void* key; // key
  8. uint32_t keylen; // key length
  9. // provisional txn info for the ule
  10. uint32_t num_provisional; // uncommitted txn number
  11. uint32_t num_committed; // committed txn number
  12. TXNID *prov_ids; // each txnid for uncommitted txn
  13. TOKUTXN *prov_txns; // each txn for uncommited txn
  14. TOKUTXN_STATE *prov_states; // each txn state for uncommitted txn
  15. };
  16. static int
  17. build_index(DB_INDEXER *indexer) {
  18. int result = 0;
  19. bool done = false;
  20. for (uint64_t loop_count = 0; !done; loop_count++) {
  21. toku_indexer_lock(indexer);
  22. // grab the multi operation lock because we will be injecting messages
  23. // grab it here because we must hold it before
  24. // trying to pin any live transactions, as discovered by #5775
  25. toku_multi_operation_client_lock();
  26. // grab the next leaf entry and get its provisional info. we'll
  27. // need the provisional info for the undo-do algorithm, and we get
  28. // it here so it can be read atomically with respect to txn commit
  29. // and abort. the atomicity comes from the root-to-leaf path pinned
  30. // by the query and in the getf callback function
  31. //
  32. // this allocates space for the prov info, so we have to destroy it
  33. // when we're done.
  34. struct ule_prov_info prov_info;
  35. memset(&prov_info, 0, sizeof(prov_info));
  36. result = get_next_ule_with_prov_info(indexer, &prov_info);
  37. if (result != 0) {
  38. invariant(prov_info.ule == NULL);
  39. done = true;
  40. if (result == DB_NOTFOUND) {
  41. result = 0; // all done, normal way to exit loop successfully
  42. }
  43. }
  44. else {
  45. invariant(prov_info.le);
  46. invariant(prov_info.ule);
  47. for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) {
  48. DB *db = indexer->i->dest_dbs[which_db];
  49. DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db];
  50. DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db];
  51. result = indexer_undo_do(indexer, db, &prov_info, hot_keys, hot_vals);
  52. if ((result != 0) && (indexer->i->error_callback != NULL)) {
  53. // grab the key and call the error callback
  54. DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
  55. toku_dbt_set(prov_info.keylen, prov_info.key, &key, NULL);
  56. indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
  57. toku_destroy_dbt(&key);
  58. }
  59. }
  60. // the leafentry and ule are not owned by the prov_info,
  61. // and are still our responsibility to free
  62. toku_free(prov_info.le);
  63. toku_free(prov_info.key);
  64. toku_ule_free(prov_info.ule);
  65. }
  66. toku_multi_operation_client_unlock();
  67. toku_indexer_unlock(indexer);
  68. ule_prov_info_destroy(&prov_info);
  69. if (result == 0) {
  70. result = maybe_call_poll_func(indexer, loop_count);
  71. }
  72. if (result != 0) {
  73. done = true;
  74. }
  75. }
  76. }



indexer_undo_do实现很直接,首先调用 indexer_undo_do_committed处理已提交事务对二级索引的修改,这些修改在pk上是提交的,那么在二级索引上面也一定是提交的。反复修改同一个pk会导致产生多个二级索引的key值。在pk上的体现是新值override老值;而在二级索引上就是要删老值,加新值。这也就是undo_do的意思啦。


  1. int
  2. indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
  3. int result = indexer_undo_do_committed(indexer, hotdb, prov_info, hot_keys, hot_vals);
  4. if (result == 0) {
  5. result = indexer_undo_do_provisional(indexer, hotdb, prov_info, hot_keys, hot_vals);
  6. }
  7. if (indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK) {
  8. result = EINVAL;
  9. }
  10. return result;
  11. }




如果当前事务状态是TOKUTXN_ABORTING,啥也不用干,省得以后在root txn commit时还要再去做rollback。

条件xrindex == num_committed表示当前事务的root txn,一定把它加到xids里面;否则,意味着是子事务,只有当它处于TOKUTXN_LIVE状态时加到xids里面。xids数组是为了往FT发msg用的,表示msg所处txn上下文。

对于provisional事务,也有undo和do阶段。针对mvcc里面的nested txn,undo阶段删除old image对应的二级索引key,do阶段添加new image对应的二级索引key。这部分跟indexer_undo_do_committed类似。

只不过indexer_undo_do_provisional需要考虑最外层provisional事务(当前alive事务的root txn)的状态。

如果是TOKUTXN_LIVE或者TOKUTXN_PREPARING表名root txn正在进行中,模拟用户写索引的行为,直接调用toku_ft_maybe_delete(删除old key)或者toku_ft_maybe_insert(添加new key),这个过程是需要记undo log和redo log的,因为pk上这个事务正在进行中。

如果最外层provisional事务(当前alive事务的root txn)的状态是TOKUTXN_COMMITTING或者TOKUTXN_RETIRED表示pk上这个事务准备提交或者已经提交,直接删除old key或者添加new key,不需要记undo log和redo log,因为pk预期是提交的。


release_txns函数unpin每个活跃的provisional事务,pin的过程是在toku_txn_pin_live_txn_unlocked做的;pin的目的是防止txn commit或者abort。

  1. static int
  2. indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
  3. int result = 0;
  4. indexer_commit_keys_set_empty(&indexer->i->commit_keys);
  5. ULEHANDLE ule = prov_info->ule;
  6. // init the xids to the root xid
  7. XIDS xids = toku_xids_get_root_xids();
  8. uint32_t num_provisional = prov_info->num_provisional;
  9. uint32_t num_committed = prov_info->num_committed;
  10. TXNID *prov_ids = prov_info->prov_ids;
  11. TOKUTXN *prov_txns = prov_info->prov_txns;
  12. TOKUTXN_STATE *prov_states = prov_info->prov_states;
  13. // nothing to do if there's nothing provisional
  14. if (num_provisional == 0) {
  15. goto exit;
  16. }
  17. TXNID outermost_xid_state;
  18. outermost_xid_state = prov_states[0];
  19. // scan the provisional stack from the outermost to the innermost transaction record
  20. TOKUTXN curr_txn;
  21. curr_txn = NULL;
  22. for (uint64_t xrindex = num_committed; xrindex < num_committed + num_provisional; xrindex++) {
  23. // get the ith transaction record
  24. UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
  25. TXNID this_xid = uxr_get_txnid(uxr);
  26. TOKUTXN_STATE this_xid_state = prov_states[xrindex - num_committed];
  27. if (this_xid_state == TOKUTXN_ABORTING) {
  28. break; // nothing to do once we reach a transaction that is aborting
  29. }
  30. if (xrindex == num_committed) { // if this is the outermost xr
  31. result = indexer_set_xid(indexer, this_xid, &xids); // always add the outermost xid to the XIDS list
  32. curr_txn = prov_txns[xrindex - num_committed];
  33. } else {
  34. switch (this_xid_state) {
  35. case TOKUTXN_LIVE:
  36. result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list
  37. curr_txn = prov_txns[xrindex - num_committed];
  38. if (!indexer->i->test_xid_state) {
  39. assert(curr_txn);
  40. }
  41. break;
  43. assert(0); // not allowed
  47. break; // nothing to do
  48. }
  49. }
  50. if (result != 0)
  51. break;
  52. if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) {
  53. // If the outermost is not live, then the inner state must be retired. That's the way that the txn API works.
  54. assert(this_xid_state == TOKUTXN_RETIRED);
  55. }
  56. if (uxr_is_placeholder(uxr)) {
  57. continue; // skip placeholders
  58. }
  59. // undo
  60. uint64_t prev_xrindex;
  61. bool prev_xrindex_found = indexer_find_prev_xr(indexer, ule, xrindex, &prev_xrindex);
  62. if (prev_xrindex_found) {
  63. UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex);
  64. if (uxr_is_delete(prevuxr)) {
  65. ; // do nothing
  66. } else if (uxr_is_insert(prevuxr)) {
  67. // generate the hot delete key
  68. result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL);
  69. if (result == 0) {
  70. paranoid_invariant(hot_keys->size <= hot_keys->capacity);
  71. for (uint32_t i = 0; i < hot_keys->size; i++) {
  72. DBT *hotkey = &hot_keys->dbts[i];
  73. // send the delete message
  74. switch (outermost_xid_state) {
  75. case TOKUTXN_LIVE:
  77. invariant(this_xid_state != TOKUTXN_ABORTING);
  78. invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING);
  79. result = indexer_ft_delete_provisional(indexer, hotdb, hotkey, xids, curr_txn);
  80. if (result == 0) {
  81. indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], curr_txn);
  82. }
  83. break;
  86. result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids);
  87. if (result == 0)
  88. indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
  89. break;
  90. case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
  91. assert(0);
  92. }
  93. }
  94. }
  95. } else
  96. assert(0);
  97. }
  98. if (result != 0)
  99. break;
  100. // do
  101. if (uxr_is_delete(uxr)) {
  102. ; // do nothing
  103. } else if (uxr_is_insert(uxr)) {
  104. // generate the hot insert key and val
  105. result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals);
  106. if (result == 0) {
  107. paranoid_invariant(hot_keys->size == hot_vals->size);
  108. paranoid_invariant(hot_keys->size <= hot_keys->capacity);
  109. paranoid_invariant(hot_vals->size <= hot_vals->capacity);
  110. for (uint32_t i = 0; i < hot_keys->size; i++) {
  111. DBT *hotkey = &hot_keys->dbts[i];
  112. DBT *hotval = &hot_vals->dbts[i];
  113. // send the insert message
  114. switch (outermost_xid_state) {
  115. case TOKUTXN_LIVE:
  117. assert(this_xid_state != TOKUTXN_ABORTING);
  118. invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING);
  119. result = indexer_ft_insert_provisional(indexer, hotdb, hotkey, hotval, xids, curr_txn);
  120. if (result == 0) {
  121. indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], prov_txns[0]);
  122. }
  123. break;
  125. case TOKUTXN_RETIRED:
  126. result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids);
  127. // no need to do this because we do implicit commits on inserts
  128. if (0 && result == 0)
  129. indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
  130. break;
  131. case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
  132. assert(0);
  133. }
  134. }
  135. }
  136. } else
  137. assert(0);
  138. if (result != 0)
  139. break;
  140. }
  141. // send commits if the outermost provisional transaction is committed
  142. for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) {
  143. result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
  144. }
  145. // be careful with this in the future. Right now, only exit path
  146. // is BEFORE we call fill_prov_info, so this happens before exit
  147. // If in the future we add a way to exit after fill_prov_info,
  148. // then this will need to be handled below exit
  149. release_txns(ule, prov_states, prov_txns, indexer);
  150. exit:
  151. toku_xids_destroy(&xids);
  152. return result;
  153. }





判断key是否落在已build好的部分是通过toku_indexer_should_insert_key函数比较le cursor正在处理的key和pk的key来实现的。为了避免访问le cursor的竞态,每次比较都是在indexer->i->indexer_lock保护下进行。直觉告诉我们,这个操作会影响性能,并发写可能会在indexer->i->indexer_lock上排队。

hot-index维护了le cursor大致位置indexer->i->position_estimate,这个位置是延迟更新的。每次访问le cursor比较后更新这个位置。那么,比它大的key一定落在build好的部分的。



需要注意的是indexer->i->position_estimate和le cursor正在处理的key(更精确)都是指pk上的位置。

  1. // a shortcut call
  2. //
  3. // a cheap(er) call to see if a key must be inserted
  4. // into the DB. If true, then we know we have to insert.
  5. // If false, then we don't know, and have to check again
  6. // after grabbing the indexer lock
  7. bool
  8. toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) {
  9. bool may_insert = false;
  10. toku_mutex_lock(&indexer->i->indexer_estimate_lock);
  11. // if we have no position estimate, we can't tell, so return false
  12. if (indexer->i->position_estimate.data == nullptr) {
  13. may_insert = false;
  14. } else {
  15. DB *db = indexer->i->src_db;
  16. const toku::comparator &cmp = toku_ft_get_comparator(db->i->ft_handle);
  17. int c = cmp(&indexer->i->position_estimate, key);
  18. // if key > position_estimate, then we know the indexer cursor
  19. // is past key, and we can safely say that associated values of
  20. // key must be inserted into the indexer's db
  21. may_insert = c < 0;
  22. }
  23. toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
  24. return may_insert;
  25. }
