第二节 Bucket遍历之Cursor
本节我们先做一节内容的铺垫,暂时不讲如何创建、获取、删除一个Bucket。而是介绍一个boltdb中的新对象Cursor。
答案是:所有的上述操作都是建立在首先定位到一个Bucket所属的位置,然后才能对其进行操作。而定位一个Bucket的功能就是由Cursor来完成的。所以我们先这一节给大家介绍一下boltdb中的Cursor。
我们先看下官方文档对Cursor的描述
Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.
用大白话讲,既然一个Bucket逻辑上是一颗b+树,那就意味着我们可以对其进行遍历。前面提到的set、get操作,无非是要在Bucket上先找到合适的位置,然后再进行操作。而“找”这个操作就是交由Cursor来完成的。简而言之对Bucket这颗b+树的遍历工作由Cursor来执行。一个Bucket对象关联一个Cursor。下面我们先看看Bucket和Cursor之间的关系。
// Cursor creates a cursor associated with the bucket.
// The cursor is only valid as long as the transaction is open.
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics.
b.tx.stats.CursorCount++
// Allocate and return a cursor.
return &Cursor{
bucket: b,
stack: make([]elemRef, 0),
}
}
3.2.1 Cursor结构
从上面可以清楚的看到,在获取一个游标Cursor对象时,会将当前的Bucket对象传进去,同时还初始化了一个栈对象,结合数据结构中学习的树的知识。我们也就知道,它的内部就是对树进行遍历。下面我们详细介绍Cursor这个人物。
// Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.
// Cursors see nested buckets with value == nil.
// Cursors can be obtained from a transaction and are valid as long as the transaction is open.
//
// Keys and values returned from the cursor are only valid for the life of the transaction.
//
// Changing data while traversing with a cursor may cause it to be invalidated
// and return unexpected keys and/or values. You must reposition your cursor
// after mutating data.
type Cursor struct {
bucket *Bucket
// 保存遍历搜索的路径
stack []elemRef
}
// elemRef represents a reference to an element on a given page/node.
type elemRef struct {
page *page
node *node
index int
}
// isLeaf returns whether the ref is pointing at a leaf page/node.
func (r *elemRef) isLeaf() bool {
if r.node != nil {
return r.node.isLeaf
}
return (r.page.flags & leafPageFlag) != 0
}
// count returns the number of inodes or page elements.
func (r *elemRef) count() int {
if r.node != nil {
return len(r.node.inodes)
}
return int(r.page.count)
}
3.2.2 Cursor对外接口
下面我们看一下Cursor对外暴露的接口有哪些。看之前也可以心里先想一下。针对一棵树我们需要哪些遍历接口呢?
主体也就是三类:定位到某一个元素的位置、在当前位置从前往后找、在当前位置从后往前找。
// First moves the cursor to the first item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte)
// Last moves the cursor to the last item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Last() (key []byte, value []byte)
// Next moves the cursor to the next item in the bucket and returns its key and value.
// If the cursor is at the end of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Next() (key []byte, value []byte)
// Prev moves the cursor to the previous item in the bucket and returns its key and value.
// If the cursor is at the beginning of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Prev() (key []byte, value []byte)
// Delete removes the current key/value under the cursor from the bucket.
// Delete fails if current key/value is a bucket or if the transaction is not writable.
func (c *Cursor) Delete() error
// Seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used. If no keys
// follow, a nil key is returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte)
下面我们详细分析一下Seek()、First()、Last()、Next()、Prev()、Delete()这三个方法的内部实现。其余的方法我们代码就不贴出来了。大致思路可以梳理一下。
3.2.3 Seek(key)实现分析
Seek()方法内部主要调用了seek()私有方法,我们重点关注seek()这个方法的实现,该方法有三个返回值,前两个为key、value、第三个为叶子节点的类型。前面提到在boltdb中,叶子节点元素有两种类型:一种是嵌套的子桶、一种是普通的key/value。而这二者就是通过flags来区分的。如果叶子节点元素为嵌套的子桶时,返回的flags为1,也就是bucketLeafFlag取值。
// Seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used. If no keys
// follow, a nil key is returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) {
k, v, flags := c.seek(seek)
// If we ended up after the last element of a page then move to the next one.
// 下面这一段逻辑是必须的,因为在seek()方法中,如果ref.index>ref.count()的话,就直接返回nil,nil,0了
// 这里需要返回下一个
if ref := &c.stack[len(c.stack)-1]; ref.index >= ref.count() {
k, v, flags = c.next()
}
if k == nil {
return nil, nil
// 子桶的话
} else if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
// seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used.
func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
_assert(c.bucket.tx.db != nil, "tx closed")
// Start from root page/node and traverse to correct page.
c.stack = c.stack[:0]
// 开始根据seek的key值搜索root
c.search(seek, c.bucket.root)
// 执行完搜索后,stack中保存了所遍历的路径
ref := &c.stack[len(c.stack)-1]
// If the cursor is pointing to the end of page/node then return nil.
if ref.index >= ref.count() {
return nil, nil, 0
}
//获取值
// If this is a bucket then return a nil value.
return c.keyValue()
}
// keyValue returns the key and value of the current leaf element.
func (c *Cursor) keyValue() ([]byte, []byte, uint32) {
//最后一个节点为叶子节点
ref := &c.stack[len(c.stack)-1]
if ref.count() == 0 || ref.index >= ref.count() {
return nil, nil, 0
}
// Retrieve value from node.
// 先从内存中取
if ref.node != nil {
inode := &ref.node.inodes[ref.index]
return inode.key, inode.value, inode.flags
}
// 其次再从文件page中取
// Or retrieve value from page.
elem := ref.page.leafPageElement(uint16(ref.index))
return elem.key(), elem.value(), elem.flags
}
seek()中最核心的方法就是调用search()了,search()方法中,传入的就是要搜索的key值和该桶的root节点。search()方法中,其内部是通过递归的层层往下搜索,下面我们详细了解一下search()内部的实现机制。
// 从根节点开始遍历
// search recursively performs a binary search against a given page/node until it finds a given key.
func (c *Cursor) search(key []byte, pgid pgid) {
// root,3
// 层层找page,bucket->tx->db->dataref
p, n := c.bucket.pageNode(pgid)
if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 {
panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
}
e := elemRef{page: p, node: n}
//记录遍历过的路径
c.stack = append(c.stack, e)
// If we're on a leaf page/node then find the specific node.
// 如果是叶子节点,找具体的值node
if e.isLeaf() {
c.nsearch(key)
return
}
if n != nil {
// 先搜索node,因为node是加载到内存中的
c.searchNode(key, n)
return
}
// 其次再在page中搜索
c.searchPage(key, p)
}
// pageNode returns the in-memory node, if it exists.
// Otherwise returns the underlying page.
func (b *Bucket) pageNode(id pgid) (*page, *node) {
// Inline buckets have a fake page embedded in their value so treat them
// differently. We'll return the rootNode (if available) or the fake page.
// 内联页的话,就直接返回其page了
if b.root == 0 {
if id != 0 {
panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
}
if b.rootNode != nil {
return nil, b.rootNode
}
return b.page, nil
}
// Check the node cache for non-inline buckets.
if b.nodes != nil {
if n := b.nodes[id]; n != nil {
return nil, n
}
}
// Finally lookup the page from the transaction if no node is materialized.
return b.tx.page(id), nil
}
//node中搜索
func (c *Cursor) searchNode(key []byte, n *node) {
var exact bool
//二分搜索
index := sort.Search(len(n.inodes), func(i int) bool {
// TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
// sort.Search() finds the lowest index where f() != -1 but we need the highest index.
ret := bytes.Compare(n.inodes[i].key, key)
if ret == 0 {
exact = true
}
return ret != -1
})
if !exact && index > 0 {
index--
}
c.stack[len(c.stack)-1].index = index
// Recursively search to the next page.
c.search(key, n.inodes[index].pgid)
}
//页中搜索
func (c *Cursor) searchPage(key []byte, p *page) {
// Binary search for the correct range.
inodes := p.branchPageElements()
var exact bool
index := sort.Search(int(p.count), func(i int) bool {
// TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
// sort.Search() finds the lowest index where f() != -1 but we need the highest index.
ret := bytes.Compare(inodes[i].key(), key)
if ret == 0 {
exact = true
}
return ret != -1
})
if !exact && index > 0 {
index--
}
c.stack[len(c.stack)-1].index = index
// Recursively search to the next page.
c.search(key, inodes[index].pgid)
}
// nsearch searches the leaf node on the top of the stack for a key.
// 搜索叶子页
func (c *Cursor) nsearch(key []byte) {
e := &c.stack[len(c.stack)-1]
p, n := e.page, e.node
// If we have a node then search its inodes.
// 先搜索node
if n != nil {
//又是二分搜索
index := sort.Search(len(n.inodes), func(i int) bool {
return bytes.Compare(n.inodes[i].key, key) != -1
})
e.index = index
return
}
// If we have a page then search its leaf elements.
// 再搜索page
inodes := p.leafPageElements()
index := sort.Search(int(p.count), func(i int) bool {
return bytes.Compare(inodes[i].key(), key) != -1
})
e.index = index
}
到这儿我们就已经看完所有的seek()查找一个key的过程了,其内部也很简单,就是从根节点开始,通过不断递归遍历每层节点,采用二分法来定位到具体的叶子节点。到达叶子节点时,其叶子节点内部存储的数据也是有序的,因此继续按照二分查找来找到最终的下标。
值得需要注意点:
在遍历时,我们都知道,有可能遍历到的当前分支节点数据并没有在内存中,此时就需要从page中加载数据遍历。所以在遍历过程中,优先在node中找,如果node为空的时候才会采用page来查找。
3.2.4 First()、Last()实现分析
前面看了定位到具体某个key的一个过程,现在我们看一下,在定位到第一个元素时,我们知道它一定是位于最左侧的第一个叶子节点的第一个元素。同理,在定位到最后一个元素时,它一定是位于最右侧的第一个叶子节点的最后一个元素。下面是其内部的实现逻辑:
First()实现
// First moves the cursor to the first item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
// 清空stack
c.stack = c.stack[:0]
p, n := c.bucket.pageNode(c.bucket.root)
// 一直找到第一个叶子节点,此处在天添加stack时,一直让index设置为0即可
ref := elemRef{page: p, node: n, index: 0}
c.stack = append(c.stack, ref)
c.first()
// If we land on an empty page then move to the next value.
// https://github.com/boltdb/bolt/issues/450
// 当前页时空的话,找下一个
if c.stack[len(c.stack)-1].count() == 0 {
c.next()
}
k, v, flags := c.keyValue()
// 是桶
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
// first moves the cursor to the first leaf element under the last page in the stack.
// 找到最后一个非叶子节点的第一个叶子节点。index=0的节点
func (c *Cursor) first() {
for {
// Exit when we hit a leaf page.
var ref = &c.stack[len(c.stack)-1]
if ref.isLeaf() {
break
}
// Keep adding pages pointing to the first element to the stack.
var pgid pgid
if ref.node != nil {
pgid = ref.node.inodes[ref.index].pgid
} else {
pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
}
p, n := c.bucket.pageNode(pgid)
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
}
}
Last()实现
// Last moves the cursor to the last item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Last() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
c.stack = c.stack[:0]
p, n := c.bucket.pageNode(c.bucket.root)
ref := elemRef{page: p, node: n}
// 设置其index为当前页元素的最后一个
ref.index = ref.count() - 1
c.stack = append(c.stack, ref)
c.last()
k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
// last moves the cursor to the last leaf element under the last page in the stack.
// 移动到栈中最后一个节点的最后一个叶子节点
func (c *Cursor) last() {
for {
// Exit when we hit a leaf page.
ref := &c.stack[len(c.stack)-1]
if ref.isLeaf() {
break
}
// Keep adding pages pointing to the last element in the stack.
var pgid pgid
if ref.node != nil {
pgid = ref.node.inodes[ref.index].pgid
} else {
pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
}
p, n := c.bucket.pageNode(pgid)
var nextRef = elemRef{page: p, node: n}
nextRef.index = nextRef.count() - 1
c.stack = append(c.stack, nextRef)
}
}
3.2.5 Next()、Prev()实现分析
再此我们从当前位置查找前一个或者下一个时,需要注意一个问题,如果当前节点中元素已经完了,那么此时需要回退到遍历路径的上一个节点。然后再继续查找,下面进行代码分析。
Next()分析
// Next moves the cursor to the next item in the bucket and returns its key and value.
// If the cursor is at the end of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Next() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
k, v, flags := c.next()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
// next moves to the next leaf element and returns the key and value.
// If the cursor is at the last leaf element then it stays there and returns nil.
func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
for {
// Attempt to move over one element until we're successful.
// Move up the stack as we hit the end of each page in our stack.
var i int
for i = len(c.stack) - 1; i >= 0; i-- {
elem := &c.stack[i]
if elem.index < elem.count()-1 {
// 元素还有时,往后移动一个
elem.index++
break
}
}
// 最后的结果elem.index++
// If we've hit the root page then stop and return. This will leave the
// cursor on the last element of the last page.
// 所有页都遍历完了
if i == -1 {
return nil, nil, 0
}
// Otherwise start from where we left off in the stack and find the
// first element of the first leaf page.
// 剩余的节点里面找,跳过原先遍历过的节点
c.stack = c.stack[:i+1]
// 如果是叶子节点,first()啥都不做,直接退出。返回elem.index+1的数据
// 非叶子节点的话,需要移动到stack中最后一个路径的第一个元素
c.first()
// If this is an empty page then restart and move back up the stack.
// https://github.com/boltdb/bolt/issues/450
if c.stack[len(c.stack)-1].count() == 0 {
continue
}
return c.keyValue()
}
}
Prev()实现
// Prev moves the cursor to the previous item in the bucket and returns its key and value.
// If the cursor is at the beginning of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Prev() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed")
// Attempt to move back one element until we're successful.
// Move up the stack as we hit the beginning of each page in our stack.
for i := len(c.stack) - 1; i >= 0; i-- {
elem := &c.stack[i]
if elem.index > 0 {
// 往前移动一格
elem.index--
break
}
c.stack = c.stack[:i]
}
// If we've hit the end then return nil.
if len(c.stack) == 0 {
return nil, nil
}
// Move down the stack to find the last element of the last leaf under this branch.
// 如果当前节点是叶子节点的话,则直接退出了,啥都不做。否则的话移动到新页的最后一个节点
c.last()
k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
3.2.6 Delete()方法分析
Delete()方法中,移动当前位置的元素
// Delete removes the current key/value under the cursor from the bucket.
// Delete fails if current key/value is a bucket or if the transaction is not writable.
func (c *Cursor) Delete() error {
if c.bucket.tx.db == nil {
return ErrTxClosed
} else if !c.bucket.Writable() {
return ErrTxNotWritable
}
key, _, flags := c.keyValue()
// Return an error if current value is a bucket.
if (flags & bucketLeafFlag) != 0 {
return ErrIncompatibleValue
}
// 从node中移除,本质上将inode数组进行移动
c.node().del(key)
return nil
}
// del removes a key from the node.
func (n *node) del(key []byte) {
// Find index of key.
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })
// Exit if the key isn't found.
if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
return
}
// Delete inode from the node.
n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)
// Mark the node as needing rebalancing.
n.unbalanced = true
}