概述

在RocksDB 3.0中加入了Column Family特性,加入这个特性之后,每一个KV对都会关联一个Column Family,其中默认的Column Family是 “default”. Column Family主要是提供给RocksDB一个逻辑的分区.从实现上来看不同的Column Family共享WAL,而都有自己的Memtable和SST.这就意味着我们可以很 快速已经方便的设置不同的属性给不同的Column Family以及快速删除对应的Column Family.

主要API

首先是创建Column Family,这里注意我们可以通过两种方式来创建Column Family,一种是在Open DB的时候通过传递需要创建的Column Family,一种是当DB创建并打开之后, 通过直接的CreateColumnFamily来创建Column Family.

  1. DB::Open(const DBOptions& db_options, const std::string& name, const std::vector<ColumnFamilyDescriptor>& column_families, std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
  2. DB::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle);

这里可以看到不管是哪一种方式最终都会返回一个ColumnFamilyHandle给调用者来使用.

然后就是删除Column Family的方式,这里很简单就是传递之前创建的ColumnFamilyHandle给RocksDB,然后用以删除.

  1. DropColumnFamily(ColumnFamilyHandle* column_family);

实现

所有的Column Family都是通过一个叫做ColumnFamilySet的结构来管理的,而每一个Column Family都是一个ColumnFamilyData.

先来看ColumnFamilySet,这里可以看到它有两个数据结构来管理Column Family,分别是map(column_family_data_)以及一个双向链表(dummy_cfd_). 其中map用来保存Column Family名字和对应的id以及ColumnFamilyData的映射. 这里要注意在RocksDB内部是将没一个ColumnFamily的名字表示为一个uint32类型的ID(max_column_family_).也就是这个ID是一个简单的递增的数值.

  1. class ColumnFamilySet {
  2. public:
  3. // ColumnFamilySet supports iteration
  4. public:
  5. .................................
  6. ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
  7. Version* dummy_version,
  8. const ColumnFamilyOptions& options);
  9. iterator begin() { return iterator(dummy_cfd_->next_); }
  10. iterator end() { return iterator(dummy_cfd_); }
  11. ...............................
  12. private:
  13. friend class ColumnFamilyData;
  14. // helper function that gets called from cfd destructor
  15. // REQUIRES: DB mutex held
  16. void RemoveColumnFamily(ColumnFamilyData* cfd);
  17. // column_families_ and column_family_data_ need to be protected:
  18. // * when mutating both conditions have to be satisfied:
  19. // 1. DB mutex locked
  20. // 2. thread currently in single-threaded write thread
  21. // * when reading, at least one condition needs to be satisfied:
  22. // 1. DB mutex locked
  23. // 2. accessed from a single-threaded write thread
  24. std::unordered_map<std::string, uint32_t> column_families_;
  25. std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
  26. uint32_t max_column_family_;
  27. ColumnFamilyData* dummy_cfd_;
  28. // We don't hold the refcount here, since default column family always exists
  29. // We are also not responsible for cleaning up default_cfd_cache_. This is
  30. // just a cache that makes common case (accessing default column family)
  31. // faster
  32. ColumnFamilyData* default_cfd_cache_;
  33. ..................................
  34. };

然后来看ColumnFamilyData,这个数据结构就是用来表示一个ColumnFamily,保存了对应的信息,我们可以看到有ID/name以及当前ColumnFamily对应的所有的version(dummy_versions_). 其中这里的next_/prev_就是在ColumnFamilySet中用来表示所有ColumnFamily的双向链表.

  1. class ColumnFamilyData {
  2. public:
  3. ~ColumnFamilyData();
  4. // thread-safe
  5. uint32_t GetID() const { return id_; }
  6. // thread-safe
  7. const std::string& GetName() const { return name_; }
  8. // Ref() can only be called from a context where the caller can guarantee
  9. // that ColumnFamilyData is alive (while holding a non-zero ref already,
  10. // holding a DB mutex, or as the leader in a write batch group).
  11. void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
  12. // Unref decreases the reference count, but does not handle deletion
  13. // when the count goes to 0. If this method returns true then the
  14. // caller should delete the instance immediately, or later, by calling
  15. // FreeDeadColumnFamilies(). Unref() can only be called while holding
  16. // a DB mutex, or during single-threaded recovery.
  17. bool Unref() {
  18. int old_refs = refs_.fetch_sub(1, std::memory_order_relaxed);
  19. assert(old_refs > 0);
  20. return old_refs == 1;
  21. }
  22. ..............................
  23. private:
  24. friend class ColumnFamilySet;
  25. ColumnFamilyData(uint32_t id, const std::string& name,
  26. Version* dummy_versions, Cache* table_cache,
  27. WriteBufferManager* write_buffer_manager,
  28. const ColumnFamilyOptions& options,
  29. const ImmutableDBOptions& db_options,
  30. const EnvOptions& env_options,
  31. ColumnFamilySet* column_family_set);
  32. uint32_t id_;
  33. const std::string name_;
  34. Version* dummy_versions_; // Head of circular doubly-linked list of versions.
  35. Version* current_; // == dummy_versions->prev_
  36. ......................................................
  37. // Thread's local copy of SuperVersion pointer
  38. // This needs to be destructed before mutex_
  39. std::unique_ptr<ThreadLocalPtr> local_sv_;
  40. // pointers for a circular linked list. we use it to support iterations over
  41. // all column families that are alive (note: dropped column families can also
  42. // be alive as long as client holds a reference)
  43. ColumnFamilyData* next_;
  44. ColumnFamilyData* prev_;
  45. ...................................
  46. ColumnFamilySet* column_family_set_;
  47. ..................................
  48. };

然后就是返回给调用者的ColumnFamilyHandleImpl结构,这个结构主要是封装了ColumnFamilyData.

  1. // ColumnFamilyHandleImpl is the class that clients use to access different
  2. // column families. It has non-trivial destructor, which gets called when client
  3. // is done using the column family
  4. class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
  5. public:
  6. // create while holding the mutex
  7. ColumnFamilyHandleImpl(
  8. ColumnFamilyData* cfd, DBImpl* db, InstrumentedMutex* mutex);
  9. // destroy without mutex
  10. virtual ~ColumnFamilyHandleImpl();
  11. virtual ColumnFamilyData* cfd() const { return cfd_; }
  12. ......................................
  13. private:
  14. ColumnFamilyData* cfd_;
  15. DBImpl* db_;
  16. InstrumentedMutex* mutex_;
  17. };

接下来我们就来从ColumnFamily的创建以及删除来分析ColumnFamily的实现.我们从DBImpl::CreateColumnFamilyImpl开始.在这个函数 中首先就是通过调用GetNextColumnFamilyID来得到当前创建的ColumnFamily对应的ID(自增).然后再调用LogAndApply来对ColumnFamily 进行对应的操作.最后再返回封装好的ColumnFamilyHandle给调用者.

  1. Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
  2. const std::string& column_family_name,
  3. ColumnFamilyHandle** handle) {
  4. .......................................
  5. {
  6. ...................................
  7. VersionEdit edit;
  8. edit.AddColumnFamily(column_family_name);
  9. uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
  10. edit.SetColumnFamily(new_id);
  11. edit.SetLogNumber(logfile_number_);
  12. edit.SetComparatorName(cf_options.comparator->Name());
  13. // LogAndApply will both write the creation in MANIFEST and create
  14. // ColumnFamilyData object
  15. { // write thread
  16. WriteThread::Writer w;
  17. write_thread_.EnterUnbatched(&w, &mutex_);
  18. // LogAndApply will both write the creation in MANIFEST and create
  19. // ColumnFamilyData object
  20. s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
  21. &mutex_, directories_.GetDbDir(), false,
  22. &cf_options);
  23. write_thread_.ExitUnbatched(&w);
  24. }
  25. if (s.ok()) {
  26. ........................................
  27. *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
  28. ROCKS_LOG_INFO(immutable_db_options_.info_log,
  29. "Created column family [%s] (ID %u)",
  30. column_family_name.c_str(), (unsigned)cfd->GetID());
  31. }
  32. .............................................
  33. } // InstrumentedMutexLock l(&mutex_)
  34. .................................
  35. return s;
  36. }

最终会在LogAndApply调用ColumnFamilySet的CreateColumnFamily函数(通过VersionSet::CreateColumnFamily),这个函数我们可看到主要做了下面三件事情

  1. 创建ColumnFamilyData对象
  2. 将新的创建好的CFD加入到双向链表
  3. 对应的Map数据结构更新数据

    1. // under a DB mutex AND write thread
    2. ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    3. const std::string& name, uint32_t id, Version* dummy_versions,
    4. const ColumnFamilyOptions& options) {
    5. assert(column_families_.find(name) == column_families_.end());
    6. ColumnFamilyData* new_cfd = new ColumnFamilyData(
    7. id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
    8. *db_options_, env_options_, this);
    9. column_families_.insert({name, id});
    10. column_family_data_.insert({id, new_cfd});
    11. max_column_family_ = std::max(max_column_family_, id);
    12. // add to linked list
    13. new_cfd->next_ = dummy_cfd_;
    14. auto prev = dummy_cfd_->prev_;
    15. new_cfd->prev_ = prev;
    16. prev->next_ = new_cfd;
    17. dummy_cfd_->prev_ = new_cfd;
    18. if (id == 0) {
    19. default_cfd_cache_ = new_cfd;
    20. }
    21. return new_cfd;
    22. }

    然后来看如何删除ColumnFamily,这里所有的删除最终都会调用ColumnFamilySet::RemoveColumnFamily函数,这个函数是是从两个Map中删除对应的ColumnFamily. 这里或许我们要问了,为什么管理的双向链表不需要删除呢。这里原因是这样的,由于ColumnFamilyData是通过引用计数管理的,因此只有当所有的引用计数都清零之后, 才需要真正的函数ColumnFamilyData(也就是会从双向链表中删除数据).

    1. // under a DB mutex AND from a write thread
    2. void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
    3. auto cfd_iter = column_family_data_.find(cfd->GetID());
    4. assert(cfd_iter != column_family_data_.end());
    5. column_family_data_.erase(cfd_iter);
    6. column_families_.erase(cfd->GetName());
    7. }

因此我们来看ColumnFamilyData的析构函数.可以看到析构函数中会从双向链表中删除对应的数据,以及处理对应的Version(corrent_).

  1. // DB mutex held
  2. ColumnFamilyData::~ColumnFamilyData() {
  3. assert(refs_.load(std::memory_order_relaxed) == 0);
  4. // remove from linked list
  5. auto prev = prev_;
  6. auto next = next_;
  7. prev->next_ = next;
  8. next->prev_ = prev;
  9. if (!dropped_ && column_family_set_ != nullptr) {
  10. // If it's dropped, it's already removed from column family set
  11. // If column_family_set_ == nullptr, this is dummy CFD and not in
  12. // ColumnFamilySet
  13. column_family_set_->RemoveColumnFamily(this);
  14. }
  15. if (current_ != nullptr) {
  16. current_->Unref();
  17. }
  18. ..............................
  19. }

最后我们来看一下在磁盘上ColumnFamily是如何保存的,首先需要明确的是ColumnFamily是保存在MANIFEST文件中的,信息的保存比较简单(之前的文章有介绍), 和MANIFEST中其他的信息没什么区别,因此这里我们主要来看数据的读取以及初始化,这里所有的操作都是包含在VersionSet::Recover中,我们来看这个函数.

函数主要的逻辑就是读取MANIFEST然后来再来将磁盘上读取的ColumnFamily的信息初始化(初始化ColumnFamilySet结构),可以看到这里相当于将之前的create/drop 的操作全部回放一遍,也就是会调用CreateColumnFamily/DropColumnFamily来将磁盘的信息初始化到内存.

  1. while (reader.ReadRecord(&record, &scratch) && s.ok()) {
  2. VersionEdit edit;
  3. s = edit.DecodeFrom(record);
  4. if (!s.ok()) {
  5. break;
  6. }
  7. // Not found means that user didn't supply that column
  8. // family option AND we encountered column family add
  9. // record. Once we encounter column family drop record,
  10. // we will delete the column family from
  11. // column_families_not_found.
  12. bool cf_in_not_found =
  13. column_families_not_found.find(edit.column_family_) !=
  14. column_families_not_found.end();
  15. // in builders means that user supplied that column family
  16. // option AND that we encountered column family add record
  17. bool cf_in_builders =
  18. builders.find(edit.column_family_) != builders.end();
  19. // they can't both be true
  20. assert(!(cf_in_not_found && cf_in_builders));
  21. ColumnFamilyData* cfd = nullptr;
  22. if (edit.is_column_family_add_) {
  23. if (cf_in_builders || cf_in_not_found) {
  24. s = Status::Corruption(
  25. "Manifest adding the same column family twice");
  26. break;
  27. }
  28. auto cf_options = cf_name_to_options.find(edit.column_family_name_);
  29. if (cf_options == cf_name_to_options.end()) {
  30. column_families_not_found.insert(
  31. {edit.column_family_, edit.column_family_name_});
  32. } else {
  33. cfd = CreateColumnFamily(cf_options->second, &edit);
  34. cfd->set_initialized();
  35. builders.insert(
  36. {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
  37. }
  38. } else if (edit.is_column_family_drop_) {
  39. if (cf_in_builders) {
  40. auto builder = builders.find(edit.column_family_);
  41. assert(builder != builders.end());
  42. delete builder->second;
  43. builders.erase(builder);
  44. cfd = column_family_set_->GetColumnFamily(edit.column_family_);
  45. if (cfd->Unref()) {
  46. delete cfd;
  47. cfd = nullptr;
  48. } else {
  49. // who else can have reference to cfd!?
  50. assert(false);
  51. }
  52. } else if (cf_in_not_found) {
  53. column_families_not_found.erase(edit.column_family_);
  54. } else {
  55. s = Status::Corruption(
  56. "Manifest - dropping non-existing column family");
  57. break;
  58. }
  59. } else if (!cf_in_not_found) {
  60. if (!cf_in_builders) {
  61. s = Status::Corruption(
  62. "Manifest record referencing unknown column family");
  63. break;
  64. }
  65. cfd = column_family_set_->GetColumnFamily(edit.column_family_);
  66. // this should never happen since cf_in_builders is true
  67. assert(cfd != nullptr);
  68. // if it is not column family add or column family drop,
  69. // then it's a file add/delete, which should be forwarded
  70. // to builder
  71. auto builder = builders.find(edit.column_family_);
  72. assert(builder != builders.end());
  73. builder->second->version_builder()->Apply(&edit);
  74. }
  75. if (cfd != nullptr) {
  76. if (edit.has_log_number_) {
  77. if (cfd->GetLogNumber() > edit.log_number_) {
  78. ROCKS_LOG_WARN(
  79. db_options_->info_log,
  80. "MANIFEST corruption detected, but ignored - Log numbers in "
  81. "records NOT monotonically increasing");
  82. } else {
  83. cfd->SetLogNumber(edit.log_number_);
  84. have_log_number = true;
  85. }
  86. }
  87. if (edit.has_comparator_ &&
  88. edit.comparator_ != cfd->user_comparator()->Name()) {
  89. s = Status::InvalidArgument(
  90. cfd->user_comparator()->Name(),
  91. "does not match existing comparator " + edit.comparator_);
  92. break;
  93. }
  94. }
  95. if (edit.has_prev_log_number_) {
  96. previous_log_number = edit.prev_log_number_;
  97. have_prev_log_number = true;
  98. }
  99. if (edit.has_next_file_number_) {
  100. next_file = edit.next_file_number_;
  101. have_next_file = true;
  102. }
  103. if (edit.has_max_column_family_) {
  104. max_column_family = edit.max_column_family_;
  105. }
  106. if (edit.has_last_sequence_) {
  107. last_sequence = edit.last_sequence_;
  108. have_last_sequence = true;
  109. }
  110. }