概述

上一篇文章中我们介绍了在RocksDB中如何在内存中查找对应的数据,这一篇我们将会详细介绍当内存中的数据不存在时,RocksDB如何在磁盘上查找对应的数据.

源码分析

依旧是从DBImpl::GetImpl开始,上一篇文章中我们分析这个函数只分析了Memtable相关的代码,这次我们来看当memtable没有查找到之后,RocksDB是如何处理的.我们可以看到当MemTable中没有找到对应的数据之后(包括删除),RocksDB将会进入对于sst的查找.

  1. if (!done) {
  2. PERF_TIMER_GUARD(get_from_output_files_time);
  3. sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
  4. &range_del_agg, value_found, nullptr, nullptr, callback,
  5. is_blob_index);
  6. RecordTick(stats_, MEMTABLE_MISS);
  7. }

从上面的代码我们可以看到直接从当前的version(sv->current)调用Get方法,因此接下来我们就来详细看这个函数。 这个函数简单来说就是根据所需要查找的key,然后选择对应的文件,这里每次会返回一个文件(key在sst的key范围内),然后循环查找.

先来看查找之前的初始化

  1. GetContext get_context(
  2. user_comparator(), merge_operator_, info_log_, db_statistics_,
  3. status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
  4. value, value_found, merge_context, range_del_agg, this->env_, seq,
  5. merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
  6. // Pin blocks that we read to hold merge operands
  7. if (merge_operator_) {
  8. pinned_iters_mgr.StartPinning();
  9. }
  10. FilePicker fp(
  11. storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
  12. storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
  13. user_comparator(), internal_comparator());
  14. FdWithKeyRange* f = fp.GetNextFile();

第一个是GetContext结构,这个类只要是根据传递进来的文件元信息来查找对应的key.然后是FilePicker,这个类主要是根据传递进来的key来选择对应的文件.这里最重要就是GetNextFile这个函数,我们来看这个函数。

这个函数他会遍历所有的level,然后再遍历每个level的所有的文件,这里会对level 0的文件做一个特殊处理,这是因为只有level0的sst的range不是有序的,因此我们每次查找需要查找所有的文件,也就是会一个个的遍历.

而在非level0,我们只需要按照二分查找来得到对应的文件即可,如果二分查找不存在,那么我就需要进入下一个level进行查找.

  1. FdWithKeyRange* GetNextFile() {
  2. while (!search_ended_) { // Loops over different levels.
  3. while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
  4. // Loops over all files in current level.
  5. FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
  6. hit_file_level_ = curr_level_;
  7. is_hit_file_last_in_level_ =
  8. curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
  9. int cmp_largest = -1;
  10. if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
  11. // Check if key is within a file's range. If search left bound and
  12. // right bound point to the same find, we are sure key falls in
  13. // range.
  14. assert(
  15. curr_level_ == 0 ||
  16. curr_index_in_curr_level_ == start_index_in_curr_level_ ||
  17. user_comparator_->Compare(user_key_,
  18. ExtractUserKey(f->smallest_key)) <= 0);
  19. int cmp_smallest = user_comparator_->Compare(user_key_,
  20. ExtractUserKey(f->smallest_key));
  21. if (cmp_smallest >= 0) {
  22. cmp_largest = user_comparator_->Compare(user_key_,
  23. ExtractUserKey(f->largest_key));
  24. }
  25. // Setup file search bound for the next level based on the
  26. // comparison results
  27. if (curr_level_ > 0) {
  28. file_indexer_->GetNextLevelIndex(curr_level_,
  29. curr_index_in_curr_level_,
  30. cmp_smallest, cmp_largest,
  31. &search_left_bound_,
  32. &search_right_bound_);
  33. }
  34. // Key falls out of current file's range
  35. if (cmp_smallest < 0 || cmp_largest > 0) {
  36. if (curr_level_ == 0) {
  37. ++curr_index_in_curr_level_;
  38. continue;
  39. } else {
  40. // Search next level.
  41. break;
  42. }
  43. }
  44. }
  45. returned_file_level_ = curr_level_;
  46. if (curr_level_ > 0 && cmp_largest < 0) {
  47. // No more files to search in this level.
  48. search_ended_ = !PrepareNextLevel();
  49. } else {
  50. ++curr_index_in_curr_level_;
  51. }
  52. return f;
  53. }
  54. // Start searching next level.
  55. search_ended_ = !PrepareNextLevel();
  56. }
  57. // Search ended.
  58. return nullptr;
  59. }

这里RocksDB使用了一个技巧用来加快二分查找的速度,每次更新sst的时候,RocksDB都会调用FileIndexer::UpdateIndex来更新这样的一个结构,这个结构就是FileIndexer,它主要是用来保存每一个level和level+1的key范围的关联信息,这样当我们在level查找的时候,如果没有查找到信息,那么我们将会迅速得到下一个level需要查找的文件范围.每一个key来进行比较总会有三种情况:

  • 小于当前sst的smallest.
  • 大于当前sst的largest.
  • 处于这个范围.

那么我们只需要在初始化索引的时候能够得到当前的sst在下一个level中的位置,就可以根据上面三种类型来确定下一个level我们需要进行二分查找的文件范围.在RocksDB中定义了下面三个值.

  1. // Point to a left most file in a lower level that may contain a key,
  2. // which compares greater than smallest of a FileMetaData (upper level)
  3. int32_t smallest_lb;
  4. // Point to a left most file in a lower level that may contain a key,
  5. // which compares greater than largest of a FileMetaData (upper level)
  6. int32_t largest_lb;
  7. // Point to a right most file in a lower level that may contain a key,
  8. // which compares smaller than smallest of a FileMetaData (upper level)
  9. int32_t smallest_rb;
  10. // Point to a right most file in a lower level that may contain a key,
  11. // which compares smaller than largest of a FileMetaData (upper level)
  12. int32_t largest_rb;

我们通过例子来解释这三个值.假设有下面两个level,4个sst.那么初始化的时候,对应的level1的这个sst对应的四个值分别为. smallest_lb=1;largest_lb=2;smallest_rb=1;largest_rb=2;

  1. level 1: [50 - 60]
  2. level 2: [1 - 40], [45 - 55], [58 - 80]

此时如果我们查找一个key为49,然后第一次比较,也就是key < level1.sst->smallest,那么我们将会知道我们需要在0和smallest_rb之间来查找,也就是0和1.假设我们查找key是55,也就是 level1.sst->smallest < key < level1.test.largest,此时我们在level2将需要在smallest_rb和largest_rb之间.这里可以看到其实就是计算一个重合的区间。

来看RocksDB如何根据当前level的比较结果来计算下一个level需要二分查找的文件范围:

  1. void FileIndexer::GetNextLevelIndex(const size_t level, const size_t file_index,
  2. const int cmp_smallest,
  3. const int cmp_largest, int32_t* left_bound,
  4. int32_t* right_bound) const {
  5. assert(level > 0);
  6. const IndexUnit* index_units = next_level_index_[level].index_units;
  7. const auto& index = index_units[file_index];
  8. if (cmp_smallest < 0) {
  9. *left_bound = (level > 0 && file_index > 0)
  10. ? index_units[file_index - 1].largest_lb
  11. : 0;
  12. *right_bound = index.smallest_rb;
  13. } else if (cmp_smallest == 0) {
  14. *left_bound = index.smallest_lb;
  15. *right_bound = index.smallest_rb;
  16. } else if (cmp_smallest > 0 && cmp_largest < 0) {
  17. *left_bound = index.smallest_lb;
  18. *right_bound = index.largest_rb;
  19. } else if (cmp_largest == 0) {
  20. *left_bound = index.largest_lb;
  21. *right_bound = index.largest_rb;
  22. } else if (cmp_largest > 0) {
  23. *left_bound = index.largest_lb;
  24. *right_bound = level_rb_[level + 1];
  25. } else {
  26. assert(false);
  27. }
  28. }

看完上面这些我们继续来看RocksDB对于文件的查找.这里所有对于key的查找都是在table_cache_->Get中.这里我们暂且略过这个函数的实现,最后我们再来详细分析这个函数.

  1. while (f != nullptr) {
  2. ................................
  3. *status = table_cache_->Get(
  4. read_options, *internal_comparator(), f->fd, ikey, &get_context,
  5. cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
  6. IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
  7. fp.IsHitFileLastInLevel()),
  8. fp.GetCurrentLevel());
  9. // TODO: examine the behavior for corrupted key
  10. if (!status->ok()) {
  11. return;
  12. }
  13. .......................
  14. }

当table_cache_->Get返回之后,我们需要根据get_context来判断返回的结果

  1. switch (get_context.State()) {
  2. case GetContext::kNotFound:
  3. // Keep searching in other files
  4. break;
  5. case GetContext::kMerge:
  6. break;
  7. case GetContext::kFound:
  8. if (fp.GetHitFileLevel() == 0) {
  9. RecordTick(db_statistics_, GET_HIT_L0);
  10. } else if (fp.GetHitFileLevel() == 1) {
  11. RecordTick(db_statistics_, GET_HIT_L1);
  12. } else if (fp.GetHitFileLevel() >= 2) {
  13. RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
  14. }
  15. return;
  16. case GetContext::kDeleted:
  17. // Use empty error message for speed
  18. *status = Status::NotFound();
  19. return;
  20. case GetContext::kCorrupt:
  21. *status = Status::Corruption("corrupted key for ", user_key);
  22. return;
  23. case GetContext::kBlobIndex:
  24. ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
  25. *status = Status::NotSupported(
  26. "Encounter unexpected blob index. Please open DB with "
  27. "rocksdb::blob_db::BlobDB instead.");
  28. return;
  29. }

如果没有发现对应的值则进入下一次文件查找

  1. f = fp.GetNextFile();

最后我们来详细分析最核心的函数TableCache::Get,这个函数不仅仅是返回对应的查找结果,并且还会cache相应的文件信息,并且如果row_cache打开,他还会做row cache.这里row cache就是对当前的所需要查找的key在当前sst中对应的value进行cache.

先来看如果打开了row cache,RocksDB将会如何处理,首先它会计算row cache的key.通过下面的代码我们可以看到row cache的key就是fd_number+seq_no+user_key.

  1. uint64_t fd_number = fd.GetNumber();
  2. auto user_key = ExtractUserKey(k);
  3. // We use the user key as cache key instead of the internal key,
  4. // otherwise the whole cache would be invalidated every time the
  5. // sequence key increases. However, to support caching snapshot
  6. // reads, we append the sequence number (incremented by 1 to
  7. // distinguish from 0) only in this case.
  8. uint64_t seq_no =
  9. options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
  10. // Compute row cache key.
  11. row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
  12. row_cache_id_.size());
  13. AppendVarint64(&row_cache_key, fd_number);
  14. AppendVarint64(&row_cache_key, seq_no);
  15. row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
  16. user_key.size());

然后就是在row cache中进行一次查找.如果有对应的值则直接返回结果,否则则将会在对应的sst读取传递进来的key.

  1. if (auto row_handle =
  2. ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
  3. Cleanable value_pinner;
  4. auto release_cache_entry_func = [](void* cache_to_clean,
  5. void* cache_handle) {
  6. ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
  7. };
  8. auto found_row_cache_entry = static_cast<const std::string*>(
  9. ioptions_.row_cache->Value(row_handle));
  10. ....................................
  11. done = true;
  12. } else {
  13. // Not found, setting up the replay log.
  14. RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
  15. row_cache_entry = &row_cache_entry_buffer;
  16. }

接下来就是需要在对应的sst文件读取对应的key的值,这里可以看到每一个fd都包含了一个TableReader的结构,这个结构就是用来保存文件的内容.而我们的table_cache主要就是缓存这个结构.

  1. Status s;
  2. TableReader* t = fd.table_reader;
  3. Cache::Handle* handle = nullptr;
  4. if (!done && s.ok()) {
  5. if (t == nullptr) {
  6. s = FindTable(env_options_, internal_comparator, fd, &handle,
  7. options.read_tier == kBlockCacheTier /* no_io */,
  8. true /* record_read_stats */, file_read_hist, skip_filters,
  9. level);
  10. if (s.ok()) {
  11. t = GetTableReaderFromHandle(handle);
  12. }
  13. }
  14. ..........................
  15. }

上面的代码会直接调用TableCache::FindTable, 这个函数主要是用来实现对应tablereader的读取以及row cache.

  1. Status TableCache::FindTable(const EnvOptions& env_options,
  2. const InternalKeyComparator& internal_comparator,
  3. const FileDescriptor& fd, Cache::Handle** handle,
  4. const bool no_io, bool record_read_stats,
  5. HistogramImpl* file_read_hist, bool skip_filters,
  6. int level,
  7. bool prefetch_index_and_filter_in_cache) {
  8. ...................................................
  9. if (*handle == nullptr) {
  10. if (no_io) { // Don't do IO and return a not-found status
  11. return Status::Incomplete("Table not found in table_cache, no_io is set");
  12. }
  13. unique_ptr<TableReader> table_reader;
  14. s = GetTableReader(env_options, internal_comparator, fd,
  15. false /* sequential mode */, 0 /* readahead */,
  16. record_read_stats, file_read_hist, &table_reader,
  17. skip_filters, level, prefetch_index_and_filter_in_cache);
  18. if (!s.ok()) {
  19. assert(table_reader == nullptr);
  20. RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
  21. // We do not cache error results so that if the error is transient,
  22. // or somebody repairs the file, we recover automatically.
  23. } else {
  24. s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
  25. handle);
  26. if (s.ok()) {
  27. // Release ownership of table reader.
  28. table_reader.release();
  29. }
  30. }
  31. }
  32. return s;
  33. }

通过上面的代码可以看到实现很简单,就是一般的cache逻辑,读取然后判断是否存在,不存在则插入到cache. 上面的函数会调用 TableCache::GetTableReader,我们来简单看下这个函数.

  1. Status TableCache::GetTableReader(
  2. const EnvOptions& env_options,
  3. const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
  4. bool sequential_mode, size_t readahead, bool record_read_stats,
  5. HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
  6. bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
  7. bool for_compaction) {
  8. ..........................................
  9. if (s.ok()) {
  10. ...............................................
  11. s = ioptions_.table_factory->NewTableReader(
  12. TableReaderOptions(ioptions_, env_options, internal_comparator,
  13. skip_filters, level),
  14. std::move(file_reader), fd.GetFileSize(), table_reader,
  15. prefetch_index_and_filter_in_cache);
  16. TEST_SYNC_POINT("TableCache::GetTableReader:0");
  17. }
  18. return s;
  19. }

可以看到最关键的调用就是调用ioptions_.table_factory->NewTableReader, 这里RocksDB会根据我们配置的不同的sst格式来调用不同的reader,而在RocksDB中默认的格式是基于block.

  1. // Create default block based table factory.
  2. extern TableFactory* NewBlockBasedTableFactory(
  3. const BlockBasedTableOptions& table_options = BlockBasedTableOptions());

这里我们就不详细分析sst的文件格式了,以后我们会来详细对比这几个文件格式的优劣.这里我们只需要知道最终缓存的tablereader就是一个BlockBasedTable对象(假设使用了基于block的sst format).

当读取完毕TableReader之后,RocksDB就需要从sst文件中get key了,也就是最终的key查找方式是在每个sst format class的Get方法中实现的。

  1. if (s.ok()) {
  2. get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
  3. s = t->Get(options, k, get_context, skip_filters);
  4. get_context->SetReplayLog(nullptr);
  5. }

和上面一样,这里的get也就是对应的sst format的get.

最后如果查找到key,则开始缓存对应的kv到row_cache.

  1. size_t charge =
  2. row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
  3. void* row_ptr = new std::string(std::move(*row_cache_entry));
  4. ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
  5. &DeleteEntry<std::string>);

这里整个读取流程我们都分析完毕了,不过这里略过了merge,delete range以及不同sst format如何组织以及读取内容,后续我们会详细分析这些略过的内容.

原文:http://mysql.taobao.org/monthly/2018/12/08/