第三节 node节点的相关操作
在开始分析node节点之前,我们先看一下官方对node节点的描述
node represents an in-memory, deserialized page
一个node节点,既可能是叶子节点,也可能是根节点,也可能是分支节点。是物理磁盘上读取进来的页page的内存表现形式。
3.3.1 node节点的定义
// node represents an in-memory, deserialized page.
type node struct {
bucket *Bucket // 关联一个桶
isLeaf bool
unbalanced bool // 值为true的话,需要考虑页合并
spilled bool // 值为true的话,需要考虑页分裂
key []byte // 对于分支节点的话,保留的是最小的key
pgid pgid // 分支节点关联的页id
parent *node // 该节点的parent
children nodes // 该节点的孩子节点
inodes inodes // 该节点上保存的索引数据
}
// inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
type inode struct {
// 表示是否是子桶叶子节点还是普通叶子节点。如果flags值为1表示子桶叶子节点,否则为普通叶子节点
flags uint32
// 当inode为分支元素时,pgid才有值,为叶子元素时,则没值
pgid pgid
key []byte
// 当inode为分支元素时,value为空,为叶子元素时,才有值
value []byte
}
type inodes []inode
3.3.2 node节点和page转换
在node对象上有两个方法,read(page)、write(page),其中read(page)方法是用来通过page构建一个node节点;而write(page)方法则是将当前的node节点写入到page中,我们在前面他提到了node节点和page节点的相互转换,此处为了保证内容完整性,我们还是再补充下,同时也给大家加深下影响,展示下同样的数据在磁盘上如何组织的,在内存中又是如何组织的。
node->page
// write writes the items onto one or more pages.
// 将node转为page
func (n *node) write(p *page) {
// Initialize page.
// 判断是否是叶子节点还是非叶子节点
if n.isLeaf {
p.flags |= leafPageFlag
} else {
p.flags |= branchPageFlag
}
// 这儿叶子节点不可能溢出,因为溢出时,会分裂
if len(n.inodes) >= 0xFFFF {
panic(fmt.Sprintf("inode overflow: %d (pgid=%d)", len(n.inodes), p.id))
}
p.count = uint16(len(n.inodes))
// Stop here if there are no items to write.
if p.count == 0 {
return
}
// Loop over each item and write it to the page.
// b指向的指针为提逃过所有item头部的位置
b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[n.pageElementSize()*len(n.inodes):]
for i, item := range n.inodes {
_assert(len(item.key) > 0, "write: zero-length inode key")
// Write the page element.
// 写入叶子节点数据
if n.isLeaf {
elem := p.leafPageElement(uint16(i))
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.flags = item.flags
elem.ksize = uint32(len(item.key))
elem.vsize = uint32(len(item.value))
} else {
// 写入分支节点数据
elem := p.branchPageElement(uint16(i))
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.ksize = uint32(len(item.key))
elem.pgid = item.pgid
_assert(elem.pgid != p.id, "write: circular dependency occurred")
}
// If the length of key+value is larger than the max allocation size
// then we need to reallocate the byte array pointer.
//
// See: https://github.com/boltdb/bolt/pull/335
klen, vlen := len(item.key), len(item.value)
if len(b) < klen+vlen {
b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:]
}
// Write data for the element to the end of the page.
copy(b[0:], item.key)
b = b[klen:]
copy(b[0:], item.value)
b = b[vlen:]
}
// DEBUG ONLY: n.dump()
}
page->node
// 根据page来初始化node
// read initializes the node from a page.
func (n *node) read(p *page) {
n.pgid = p.id
n.isLeaf = ((p.flags & leafPageFlag) != 0)
// 一个inodes对应一个xxxPageElement对象
n.inodes = make(inodes, int(p.count))
for i := 0; i < int(p.count); i++ {
inode := &n.inodes[i]
if n.isLeaf {
// 获取第i个叶子节点
elem := p.leafPageElement(uint16(i))
inode.flags = elem.flags
inode.key = elem.key()
inode.value = elem.value()
} else {
// 树枝节点
elem := p.branchPageElement(uint16(i))
inode.pgid = elem.pgid
inode.key = elem.key()
}
_assert(len(inode.key) > 0, "read: zero-length inode key")
}
// Save first key so we can find the node in the parent when we spill.
if len(n.inodes) > 0 {
// 保存第1个元素的值
n.key = n.inodes[0].key
_assert(len(n.key) > 0, "read: zero-length node key")
} else {
n.key = nil
}
}
3.3.3 node节点的增删改查
put(k,v)
// put inserts a key/value.
// 如果put的是一个key、value的话,不需要指定pgid。
// 如果put的一个树枝节点,则需要指定pgid,不需要指定value
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
if pgid >= n.bucket.tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))
} else if len(oldKey) <= 0 {
panic("put: zero-length old key")
} else if len(newKey) <= 0 {
panic("put: zero-length new key")
}
// Find insertion index.
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })
// Add capacity and shift nodes if we don't have an exact match and need to insert.
exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
if !exact {
n.inodes = append(n.inodes, inode{})
copy(n.inodes[index+1:], n.inodes[index:])
}
inode := &n.inodes[index]
inode.flags = flags
inode.key = newKey
inode.value = value
inode.pgid = pgid
_assert(len(inode.key) > 0, "put: zero-length inode key")
}
get(k)
在node中,没有get(k)的方法,其本质是在Cursor中就返回了get的数据。大家可以看看Cursor中的keyValue()方法。
del(k)
// del removes a key from the node.
func (n *node) del(key []byte) {
// Find index of key.
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })
// Exit if the key isn't found.
if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
return
}
// Delete inode from the node.
n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)
// Mark the node as needing rebalancing.
n.unbalanced = true
}
nextSibling()、prevSibling()
// nextSibling returns the next node with the same parent.
// 返回下一个兄弟节点
func (n *node) nextSibling() *node {
if n.parent == nil {
return nil
}
index := n.parent.childIndex(n)
if index >= n.parent.numChildren()-1 {
return nil
}
return n.parent.childAt(index + 1)
}
// prevSibling returns the previous node with the same parent.
// 返回上一个兄弟节点
func (n *node) prevSibling() *node {
if n.parent == nil {
return nil
}
// 首先找下标
index := n.parent.childIndex(n)
if index == 0 {
return nil
}
// 然后返回
return n.parent.childAt(index - 1)
}
// childIndex returns the index of a given child node.
func (n *node) childIndex(child *node) int {
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, child.key) != -1 })
return index
}
// childAt returns the child node at a given index.
// 只有树枝节点才有孩子
func (n *node) childAt(index int) *node {
if n.isLeaf {
panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index))
}
return n.bucket.node(n.inodes[index].pgid, n)
}
// node creates a node from a page and associates it with a given parent.
// 根据pgid创建一个node
func (b *Bucket) node(pgid pgid, parent *node) *node {
_assert(b.nodes != nil, "nodes map expected")
// Retrieve node if it's already been created.
if n := b.nodes[pgid]; n != nil {
return n
}
// Otherwise create a node and cache it.
n := &node{bucket: b, parent: parent}
if parent == nil {
b.rootNode = n
} else {
parent.children = append(parent.children, n)
}
// Use the inline page if this is an inline bucket.
// 如果第二次进来,b.page不为空
// 此处的pgid和b.page只会有一个是有值的。
var p = b.page
// 说明不是内联桶
if p == nil {
p = b.tx.page(pgid)
}
// Read the page into the node and cache it.
n.read(p)
// 缓存
b.nodes[pgid] = n
// Update statistics.
b.tx.stats.NodeCount++
return n
}
3.3.4 node节点的分裂和合并
上面我们看了对node节点的操作,包括put和del方法。经过这些操作后,可能会导致当前的page填充度过高或者过低。因此就引出了node节点的分裂和合并。下面简单介绍下什么是分裂和合并。
分裂: 当一个node中的数据过多时,最简单就是当超过了page的填充度时,就需要将当前的node拆分成两个,也就是底层会将一页数据拆分存放到两页中。
合并: 当删除了一个或者一批对象时,此时可能会导致一页数据的填充度过低,此时空间可能会浪费比较多。所以就需要考虑对页之间进行数据合并。
有了大概的了解,下面我们就看一下对一个node分裂和合并的实现过程。
分裂spill()
spill writes the nodes to dirty pages and splits nodes as it goes. Returns an error if dirty pages cannot be allocated.
// spill writes the nodes to dirty pages and splits nodes as it goes.
// Returns an error if dirty pages cannot be allocated.
func (n *node) spill() error {
var tx = n.bucket.tx
if n.spilled {
return nil
}
// Spill child nodes first. Child nodes can materialize sibling nodes in
// the case of split-merge so we cannot use a range loop. We have to check
// the children size on every loop iteration.
sort.Sort(n.children)
for i := 0; i < len(n.children); i++ {
if err := n.children[i].spill(); err != nil {
return err
}
}
// We no longer need the child list because it's only used for spill tracking.
n.children = nil
// Split nodes into appropriate sizes. The first node will always be n.
// 将当前的node进行拆分成多个node
var nodes = n.split(tx.db.pageSize)
for _, node := range nodes {
// Add node's page to the freelist if it's not new.
if node.pgid > 0 {
tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
node.pgid = 0
}
// Allocate contiguous space for the node.
p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
if err != nil {
return err
}
// Write the node.
if p.id >= tx.meta.pgid {
// 不可能发生
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
}
node.pgid = p.id
node.write(p)
// 已经拆分过了
node.spilled = true
// Insert into parent inodes.
if node.parent != nil {
var key = node.key
if key == nil {
key = node.inodes[0].key
}
// 放入父亲节点中
node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)
node.key = node.inodes[0].key
_assert(len(node.key) > 0, "spill: zero-length node key")
}
// Update the statistics.
tx.stats.Spill++
}
// If the root node split and created a new root then we need to spill that
// as well. We'll clear out the children to make sure it doesn't try to respill.
if n.parent != nil && n.parent.pgid == 0 {
n.children = nil
return n.parent.spill()
}
return nil
}
// split breaks up a node into multiple smaller nodes, if appropriate.
// This should only be called from the spill() function.
func (n *node) split(pageSize int) []*node {
var nodes []*node
node := n
for {
// Split node into two.
a, b := node.splitTwo(pageSize)
nodes = append(nodes, a)
// If we can't split then exit the loop.
if b == nil {
break
}
// Set node to b so it gets split on the next iteration.
node = b
}
return nodes
}
// splitTwo breaks up a node into two smaller nodes, if appropriate.
// This should only be called from the split() function.
func (n *node) splitTwo(pageSize int) (*node, *node) {
// Ignore the split if the page doesn't have at least enough nodes for
// two pages or if the nodes can fit in a single page.
// 太小的话,就不拆分了
if len(n.inodes) <= (minKeysPerPage*2) || n.sizeLessThan(pageSize) {
return n, nil
}
// Determine the threshold before starting a new node.
var fillPercent = n.bucket.FillPercent
if fillPercent < minFillPercent {
fillPercent = minFillPercent
} else if fillPercent > maxFillPercent {
fillPercent = maxFillPercent
}
threshold := int(float64(pageSize) * fillPercent)
// Determine split position and sizes of the two pages.
splitIndex, _ := n.splitIndex(threshold)
// Split node into two separate nodes.
// If there's no parent then we'll need to create one.
if n.parent == nil {
n.parent = &node{bucket: n.bucket, children: []*node{n}}
}
// Create a new node and add it to the parent.
// 拆分出一个新节点
next := &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent}
n.parent.children = append(n.parent.children, next)
// Split inodes across two nodes.
next.inodes = n.inodes[splitIndex:]
n.inodes = n.inodes[:splitIndex]
// Update the statistics.
n.bucket.tx.stats.Split++
return n, next
}
// splitIndex finds the position where a page will fill a given threshold.
// It returns the index as well as the size of the first page.
// This is only be called from split().
// 找到合适的index
func (n *node) splitIndex(threshold int) (index, sz int) {
sz = pageHeaderSize
// Loop until we only have the minimum number of keys required for the second page.
for i := 0; i < len(n.inodes)-minKeysPerPage; i++ {
index = i
inode := n.inodes[i]
elsize := n.pageElementSize() + len(inode.key) + len(inode.value)
// If we have at least the minimum number of keys and adding another
// node would put us over the threshold then exit and return.
if i >= minKeysPerPage && sz+elsize > threshold {
break
}
// Add the element size to the total size.
sz += elsize
}
return
}
合并rebalance()
rebalance attempts to combine the node with sibling nodes if the node fill size is below a threshold or if there are not enough keys.
页合并有点复杂,虽然能看懂,但要自己写感觉还是挺难写出bug free的
// rebalance attempts to combine the node with sibling nodes if the node fill
// size is below a threshold or if there are not enough keys.
// 填充率太低或者没有足够的key时,进行页合并
func (n *node) rebalance() {
if !n.unbalanced {
return
}
n.unbalanced = false
// Update statistics.
n.bucket.tx.stats.Rebalance++
// Ignore if node is above threshold (25%) and has enough keys.
var threshold = n.bucket.tx.db.pageSize / 4
if n.size() > threshold && len(n.inodes) > n.minKeys() {
return
}
// Root node has special handling.
if n.parent == nil {
// If root node is a branch and only has one node then collapse it.
if !n.isLeaf && len(n.inodes) == 1 {
// Move root's child up.
child := n.bucket.node(n.inodes[0].pgid, n)
n.isLeaf = child.isLeaf
n.inodes = child.inodes[:]
n.children = child.children
// Reparent all child nodes being moved.
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent = n
}
}
// Remove old child.
child.parent = nil
delete(n.bucket.nodes, child.pgid)
child.free()
}
return
}
// If node has no keys then just remove it.
if n.numChildren() == 0 {
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
n.parent.rebalance()
return
}
_assert(n.parent.numChildren() > 1, "parent must have at least 2 children")
// Destination node is right sibling if idx == 0, otherwise left sibling.
var target *node
// 判断当前node是否是parent的第一个孩子节点,是的话,就要找它的下一个兄弟节点,否则的话,就找上一个兄弟节点
var useNextSibling = (n.parent.childIndex(n) == 0)
if useNextSibling {
target = n.nextSibling()
} else {
target = n.prevSibling()
}
// If both this node and the target node are too small then merge them.
// 合并当前node和target,target合到node
if useNextSibling {
// Reparent all child nodes being moved.
for _, inode := range target.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
// 之前的父亲移除该孩子
child.parent.removeChild(child)
// 重新指定父亲节点
child.parent = n
// 父亲节点指当前孩子
child.parent.children = append(child.parent.children, child)
}
}
// Copy over inodes from target and remove target.
n.inodes = append(n.inodes, target.inodes...)
n.parent.del(target.key)
n.parent.removeChild(target)
delete(n.bucket.nodes, target.pgid)
target.free()
} else {
// node合到target
// Reparent all child nodes being moved.
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent.removeChild(child)
child.parent = target
child.parent.children = append(child.parent.children, child)
}
}
// Copy over inodes to target and remove node.
target.inodes = append(target.inodes, n.inodes...)
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
}
// Either this node or the target node was deleted from the parent so rebalance it.
n.parent.rebalance()
}