概述
上一篇文章中我们介绍了在RocksDB中如何在内存中查找对应的数据,这一篇我们将会详细介绍当内存中的数据不存在时,RocksDB如何在磁盘上查找对应的数据.
源码分析
依旧是从DBImpl::GetImpl开始,上一篇文章中我们分析这个函数只分析了Memtable相关的代码,这次我们来看当memtable没有查找到之后,RocksDB是如何处理的.我们可以看到当MemTable中没有找到对应的数据之后(包括删除),RocksDB将会进入对于sst的查找.
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
&range_del_agg, value_found, nullptr, nullptr, callback,
is_blob_index);
RecordTick(stats_, MEMTABLE_MISS);
}
从上面的代码我们可以看到直接从当前的version(sv->current)调用Get方法,因此接下来我们就来详细看这个函数。 这个函数简单来说就是根据所需要查找的key,然后选择对应的文件,这里每次会返回一个文件(key在sst的key范围内),然后循环查找.
先来看查找之前的初始化
GetContext get_context(
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context, range_del_agg, this->env_, seq,
merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob);
// Pin blocks that we read to hold merge operands
if (merge_operator_) {
pinned_iters_mgr.StartPinning();
}
FilePicker fp(
storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
第一个是GetContext结构,这个类只要是根据传递进来的文件元信息来查找对应的key.然后是FilePicker,这个类主要是根据传递进来的key来选择对应的文件.这里最重要就是GetNextFile这个函数,我们来看这个函数。
这个函数他会遍历所有的level,然后再遍历每个level的所有的文件,这里会对level 0的文件做一个特殊处理,这是因为只有level0的sst的range不是有序的,因此我们每次查找需要查找所有的文件,也就是会一个个的遍历.
而在非level0,我们只需要按照二分查找来得到对应的文件即可,如果二分查找不存在,那么我就需要进入下一个level进行查找.
FdWithKeyRange* GetNextFile() {
while (!search_ended_) { // Loops over different levels.
while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
// Loops over all files in current level.
FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
hit_file_level_ = curr_level_;
is_hit_file_last_in_level_ =
curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
int cmp_largest = -1;
if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
// Check if key is within a file's range. If search left bound and
// right bound point to the same find, we are sure key falls in
// range.
assert(
curr_level_ == 0 ||
curr_index_in_curr_level_ == start_index_in_curr_level_ ||
user_comparator_->Compare(user_key_,
ExtractUserKey(f->smallest_key)) <= 0);
int cmp_smallest = user_comparator_->Compare(user_key_,
ExtractUserKey(f->smallest_key));
if (cmp_smallest >= 0) {
cmp_largest = user_comparator_->Compare(user_key_,
ExtractUserKey(f->largest_key));
}
// Setup file search bound for the next level based on the
// comparison results
if (curr_level_ > 0) {
file_indexer_->GetNextLevelIndex(curr_level_,
curr_index_in_curr_level_,
cmp_smallest, cmp_largest,
&search_left_bound_,
&search_right_bound_);
}
// Key falls out of current file's range
if (cmp_smallest < 0 || cmp_largest > 0) {
if (curr_level_ == 0) {
++curr_index_in_curr_level_;
continue;
} else {
// Search next level.
break;
}
}
}
returned_file_level_ = curr_level_;
if (curr_level_ > 0 && cmp_largest < 0) {
// No more files to search in this level.
search_ended_ = !PrepareNextLevel();
} else {
++curr_index_in_curr_level_;
}
return f;
}
// Start searching next level.
search_ended_ = !PrepareNextLevel();
}
// Search ended.
return nullptr;
}
这里RocksDB使用了一个技巧用来加快二分查找的速度,每次更新sst的时候,RocksDB都会调用FileIndexer::UpdateIndex来更新这样的一个结构,这个结构就是FileIndexer,它主要是用来保存每一个level和level+1的key范围的关联信息,这样当我们在level查找的时候,如果没有查找到信息,那么我们将会迅速得到下一个level需要查找的文件范围.每一个key来进行比较总会有三种情况:
- 小于当前sst的smallest.
- 大于当前sst的largest.
- 处于这个范围.
那么我们只需要在初始化索引的时候能够得到当前的sst在下一个level中的位置,就可以根据上面三种类型来确定下一个level我们需要进行二分查找的文件范围.在RocksDB中定义了下面三个值.
// Point to a left most file in a lower level that may contain a key,
// which compares greater than smallest of a FileMetaData (upper level)
int32_t smallest_lb;
// Point to a left most file in a lower level that may contain a key,
// which compares greater than largest of a FileMetaData (upper level)
int32_t largest_lb;
// Point to a right most file in a lower level that may contain a key,
// which compares smaller than smallest of a FileMetaData (upper level)
int32_t smallest_rb;
// Point to a right most file in a lower level that may contain a key,
// which compares smaller than largest of a FileMetaData (upper level)
int32_t largest_rb;
我们通过例子来解释这三个值.假设有下面两个level,4个sst.那么初始化的时候,对应的level1的这个sst对应的四个值分别为. smallest_lb=1;largest_lb=2;smallest_rb=1;largest_rb=2;
level 1: [50 - 60]
level 2: [1 - 40], [45 - 55], [58 - 80]
此时如果我们查找一个key为49,然后第一次比较,也就是key < level1.sst->smallest,那么我们将会知道我们需要在0和smallest_rb之间来查找,也就是0和1.假设我们查找key是55,也就是 level1.sst->smallest < key < level1.test.largest,此时我们在level2将需要在smallest_rb和largest_rb之间.这里可以看到其实就是计算一个重合的区间。
来看RocksDB如何根据当前level的比较结果来计算下一个level需要二分查找的文件范围:
void FileIndexer::GetNextLevelIndex(const size_t level, const size_t file_index,
const int cmp_smallest,
const int cmp_largest, int32_t* left_bound,
int32_t* right_bound) const {
assert(level > 0);
const IndexUnit* index_units = next_level_index_[level].index_units;
const auto& index = index_units[file_index];
if (cmp_smallest < 0) {
*left_bound = (level > 0 && file_index > 0)
? index_units[file_index - 1].largest_lb
: 0;
*right_bound = index.smallest_rb;
} else if (cmp_smallest == 0) {
*left_bound = index.smallest_lb;
*right_bound = index.smallest_rb;
} else if (cmp_smallest > 0 && cmp_largest < 0) {
*left_bound = index.smallest_lb;
*right_bound = index.largest_rb;
} else if (cmp_largest == 0) {
*left_bound = index.largest_lb;
*right_bound = index.largest_rb;
} else if (cmp_largest > 0) {
*left_bound = index.largest_lb;
*right_bound = level_rb_[level + 1];
} else {
assert(false);
}
}
看完上面这些我们继续来看RocksDB对于文件的查找.这里所有对于key的查找都是在table_cache_->Get中.这里我们暂且略过这个函数的实现,最后我们再来详细分析这个函数.
while (f != nullptr) {
................................
*status = table_cache_->Get(
read_options, *internal_comparator(), f->fd, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
}
.......................
}
当table_cache_->Get返回之后,我们需要根据get_context来判断返回的结果
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kMerge:
break;
case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0);
} else if (fp.GetHitFileLevel() == 1) {
RecordTick(db_statistics_, GET_HIT_L1);
} else if (fp.GetHitFileLevel() >= 2) {
RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
}
return;
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
return;
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
case GetContext::kBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported(
"Encounter unexpected blob index. Please open DB with "
"rocksdb::blob_db::BlobDB instead.");
return;
}
如果没有发现对应的值则进入下一次文件查找
f = fp.GetNextFile();
最后我们来详细分析最核心的函数TableCache::Get,这个函数不仅仅是返回对应的查找结果,并且还会cache相应的文件信息,并且如果row_cache打开,他还会做row cache.这里row cache就是对当前的所需要查找的key在当前sst中对应的value进行cache.
先来看如果打开了row cache,RocksDB将会如何处理,首先它会计算row cache的key.通过下面的代码我们可以看到row cache的key就是fd_number+seq_no+user_key.
uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
uint64_t seq_no =
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
user_key.size());
然后就是在row cache中进行一次查找.如果有对应的值则直接返回结果,否则则将会在对应的sst读取传递进来的key.
if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
Cleanable value_pinner;
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
....................................
done = true;
} else {
// Not found, setting up the replay log.
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
row_cache_entry = &row_cache_entry_buffer;
}
接下来就是需要在对应的sst文件读取对应的key的值,这里可以看到每一个fd都包含了一个TableReader的结构,这个结构就是用来保存文件的内容.而我们的table_cache主要就是缓存这个结构.
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (!done && s.ok()) {
if (t == nullptr) {
s = FindTable(env_options_, internal_comparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
}
..........................
}
上面的代码会直接调用TableCache::FindTable, 这个函数主要是用来实现对应tablereader的读取以及row cache.
Status TableCache::FindTable(const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters,
int level,
bool prefetch_index_and_filter_in_cache) {
...................................................
if (*handle == nullptr) {
if (no_io) { // Don't do IO and return a not-found status
return Status::Incomplete("Table not found in table_cache, no_io is set");
}
unique_ptr<TableReader> table_reader;
s = GetTableReader(env_options, internal_comparator, fd,
false /* sequential mode */, 0 /* readahead */,
record_read_stats, file_read_hist, &table_reader,
skip_filters, level, prefetch_index_and_filter_in_cache);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
handle);
if (s.ok()) {
// Release ownership of table reader.
table_reader.release();
}
}
}
return s;
}
通过上面的代码可以看到实现很简单,就是一般的cache逻辑,读取然后判断是否存在,不存在则插入到cache. 上面的函数会调用 TableCache::GetTableReader,我们来简单看下这个函数.
Status TableCache::GetTableReader(
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
bool sequential_mode, size_t readahead, bool record_read_stats,
HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
bool for_compaction) {
..........................................
if (s.ok()) {
...............................................
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, env_options, internal_comparator,
skip_filters, level),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
}
return s;
}
可以看到最关键的调用就是调用ioptions_.table_factory->NewTableReader, 这里RocksDB会根据我们配置的不同的sst格式来调用不同的reader,而在RocksDB中默认的格式是基于block.
// Create default block based table factory.
extern TableFactory* NewBlockBasedTableFactory(
const BlockBasedTableOptions& table_options = BlockBasedTableOptions());
这里我们就不详细分析sst的文件格式了,以后我们会来详细对比这几个文件格式的优劣.这里我们只需要知道最终缓存的tablereader就是一个BlockBasedTable对象(假设使用了基于block的sst format).
当读取完毕TableReader之后,RocksDB就需要从sst文件中get key了,也就是最终的key查找方式是在每个sst format class的Get方法中实现的。
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context, skip_filters);
get_context->SetReplayLog(nullptr);
}
和上面一样,这里的get也就是对应的sst format的get.
最后如果查找到key,则开始缓存对应的kv到row_cache.
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
这里整个读取流程我们都分析完毕了,不过这里略过了merge,delete range以及不同sst format如何组织以及读取内容,后续我们会详细分析这些略过的内容.