数据库任务二 索引管理 任务2.1 B+树的查找 && 任务2.2 B+树的插入 任务要求 (1)结点内的查找 为了实现整个B+树的查找,首先需要实现B+树单个结点内部的查找。
1 2 3 4 5 6 7 class IxNodeHandle { int lower_bound (const char *target) const ; int upper_bound (const char *target) const ; bool LeafLookup (const char *key, Rid **value) ; page_id_t InternalLookup (const char *key) ; }
(2)B+树的查找 1 2 3 4 5 class IxIndexHandle { IxNodeHandle *FindLeafPage (const char *key, Operation operation, Transaction *transaction) ; bool GetValue (const char *key, std::vector<Rid> *result, Transaction *transaction) ; }
(3)结点内的插入 1 2 3 4 5 class IxNodeHandle { void insert_pairs (int pos, const char *key, const Rid *rid, int n) ; int Insert (const char *key, const Rid &value) ; }
(4)B+树的插入 1 2 3 4 5 6 class IxIndexHandle { bool insert_entry (const char *key, const Rid &value, Transaction *transaction) ; IxNodeHandle *Split (IxNodeHandle *node) ; void InsertIntoParent (IxNodeHandle *old_node, const char *key, IxNodeHandle *new_node, Transaction *transaction) ; }
通关截图
代码分析 (1)结点内的查找 ix_node_handle.cpp
lower_bound函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 int IxNodeHandle::lower_bound (const char *target) const { int x = 0 ; int end = page_hdr->num_key; int mid, flag; while (x < end){ mid = (x + end) / 2 ; flag = ix_compare (get_key (mid), target, file_hdr->col_type, file_hdr->col_len); if (flag < 0 ){ x = mid + 1 ; } else { end = mid; } } return x; }
采用二分查找,设定初始下限为x=0
,初始上限为end=page_hdr->num_key
,通过不断比较mid处value与target的值,不断收紧范围,直到x=end
。由于当get_key(mid)<target
时总有x = mid + 1
,直到满足条件或者所有value都比target小,即x=end=mid=page_hdr->num_key
。
upper_bound函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 int IxNodeHandle::upper_bound (const char *target) const { int x = 1 ; int end = page_hdr->num_key; int mid, flag; while (x < end){ mid = (x + end) / 2 ; flag = ix_compare (get_key (mid), target, file_hdr->col_type, file_hdr->col_len); if (flag > 0 ){ end = mid; } else { x = mid + 1 ; } } return x; }
采用二分查找,设定初始下限为x=1
,初始上限为end=page_hdr->num_key
,通过不断比较mid处value与target的值,不断收紧范围,直到x=end
。由于当get_key(mid)<=target
时总有x = mid + 1
,直到满足条件或者所有value都小于等于target,即x=end=mid=page_hdr->num_key
。
为什么初始上限为x=1
而不是x=0
,因为根据B+树的构造,为了使value的数量与key相等,于是在每个内部结点的第一个value前面额外加上第一个key,这就意味着第一个key存储的是以该结点为根结点的子树中的所有key的最小值,x上限为1则可以避免非叶结点受到影响。
LeafLookup函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 bool IxNodeHandle::LeafLookup (const char *key, Rid **value) { int pos = lower_bound (key); if (ix_compare (get_key (pos), key, file_hdr->col_type, file_hdr->col_len) == 0 ){ *value = get_rid (pos); return true ; } else { return false ; } }
首先使用lower_bound函数找到第一个满足大于等于key值的位置pos,并判断get_key(pos)和key是否相等。如果相等则说明key存在,使用get_rid获取key对应的rid。
InternalLookup函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 page_id_t IxNodeHandle::InternalLookup (const char *key) { int pos = upper_bound (key); return ValueAt (pos -1 ); }
要完成该函数,upper_bound函数
根据B+树的结构,当value位于key右边表示value>=key
,value位于key左边表示value<key
,upper_bound可以找到当前节点中第一个大于target的key的位置pos,由于key与value一一对应并且key在左value在右,所以直接pos-1
就可以找到value所对应得key。而如果使用lower_bound函数,如果value大于key值,则与上面同理。但如果value值与key值相等,得到第一个大于target的key的位置pos并不是最终结果,所寻找的value归pos-1
处得key管辖。
另一种情况如果value的值比以该结点为根结点的子树中的所有key的最小值还小,因为upper_bound的上限为1,抛开了最小值的影响,正确的找到了位于0处的value。但是lower_bound将该key考虑在内,得到了-1的pos,显然为错误的结果。位于0处的key只是作为最小值的表现形式,并不能将value进行分割。
下面是使用lower_bound的正确写法
1 2 3 4 5 6 int pos = lower_bound (key);if (ix_compare (get_key (pos), key, file_hdr->col_type, file_hdr->col_len) == 0 ) return ValueAt (pos); if ((pos-1 )<0 ) return ValueAt (pos); return ValueAt (pos-1 );
(2)B+树的查找 ix_index_handle.cpp
FindLeafPage函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 IxNodeHandle *IxIndexHandle::FindLeafPage (const char *key, Operation operation, Transaction *transaction) { IxNodeHandle *node = FetchNode (file_hdr_.root_page); while (!node->page_hdr->is_leaf){ buffer_pool_manager_->UnpinPage (node->GetPageId (), false ); node = FetchNode (node->InternalLookup (key)); }; buffer_pool_manager_->UnpinPage (node->GetPageId (), false ); return node; }
通过file_hdr_.root_page
获取根页号,使用FetchNode函数通过页号得到节点指针。非叶子节点不断调用InternalLookup函数,查找目标key所在的孩子结点,直到到达叶节点,返回叶节点即可。
GetValue函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 bool IxIndexHandle::GetValue (const char *key, std::vector<Rid> *result, Transaction *transaction) { std::scoped_lock lock{root_latch_}; IxNodeHandle *leaf = FindLeafPage (key, Operation::FIND, transaction); Rid* rid = nullptr ; buffer_pool_manager_->UnpinPage (leaf->GetPageId (), false ); if (leaf->LeafLookup (key, &rid)){ result->push_back (*rid); return true ; } else return false ; }
找到目标key值所在的叶子结点,定义空Rid,用于存储传出的所求键值对值value,并返回是否操作成功。
(3)结点内的插入 ix_node_handle.cpp
insert_pairs函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void IxNodeHandle::insert_pairs (int pos, const char *key, const Rid *rid, int n) { assert (pos >= 0 && pos <= page_hdr->num_key); int key_len = file_hdr->col_len; memmove (get_key (pos+n), get_key (pos), (page_hdr->num_key - pos) * key_len); memcpy (get_key (pos), key, n * key_len); memmove (get_rid (pos+n), get_rid (pos), (page_hdr->num_key - pos) * sizeof (Rid)); memcpy (get_rid (pos), rid, n * sizeof (Rid)); page_hdr->num_key += n; }
使用assert断言pos的合法性,下面使用系统中的memmove和memcpy函数
memmove函数的声明如下
1 void *memmove (void *str1, const void *str2, size_t n)
参数:
str1 – 指向用于存储复制内容的目标数组,类型强制转换为 void* 指针。
str2 – 指向要复制的数据源,类型强制转换为 void* 指针。
n – 要被复制的字节数。
memcpy函数的声明如下
1 void *memcpy (void *str1, const void *str2, size_t n)
参数:
str1 – 指向用于存储复制内容的目标数组,类型强制转换为 void* 指针。
str2 – 指向要复制的数据源,类型强制转换为 void* 指针。
n – 要被复制的字节数。
插入的过程是首先将pos至page_hdr->num_key
处,共(page_hdr->num_key - pos)*size
大小的内容向后移动n个位置,即搬到pos+n
位置处。其中key的size为file_hdr->col_len
,value的size为sizeof(Rid)
。
insert函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 int IxNodeHandle::Insert (const char *key, const Rid &value) { int pos = lower_bound (key); if (ix_compare (get_key (pos), key, file_hdr->col_type, file_hdr->col_len) != 0 ){ insert_pair (pos, key, value); return GetSize (); } else { return -1 ; } }
找到位置,判断是否重复,重复返回-1,不重复返回键值对数量。
(4)B+树的插入 ix_index_handle.cpp
insert_entry函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 bool IxIndexHandle::insert_entry (const char *key, const Rid &value, Transaction *transaction) { std::scoped_lock lock{root_latch_}; IxNodeHandle *leaf = FindLeafPage (key, Operation::INSERT, transaction); if (leaf->Insert (key, value) == -1 ){ buffer_pool_manager_->UnpinPage (leaf->GetPageId (), false ); return false ; } else if (leaf->GetSize () == leaf->GetMaxSize ()){ IxNodeHandle *new_leaf = Split (leaf); if (leaf->GetPageNo () == file_hdr_.last_leaf){ file_hdr_.last_leaf = new_leaf->GetPageNo (); } InsertIntoParent (leaf, new_leaf->get_key (0 ), new_leaf, transaction); buffer_pool_manager_->UnpinPage (new_leaf->GetPageId (), true ); } buffer_pool_manager_->UnpinPage (leaf->GetPageId (), false ); return true ; }
根据key值找到应该插入的叶节点,执行插入操作,如果插入失败(重复)则未操作返回false,插入成功后对结点数量判断,如果满则调用split函数。由于B+树结构,需要将新节点的第一个key值插入父节点。若当前叶子节点是最右叶子节点,则需要更新file_hdr_.last_leaf
。
split函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 IxNodeHandle *IxIndexHandle::Split (IxNodeHandle *node) { new_node->page_hdr->num_key = 0 ; new_node->page_hdr->next_free_page_no = node->page_hdr->next_free_page_no; new_node->page_hdr->parent = node->GetParentPageNo (); bool judge_leaf = node->page_hdr->is_leaf; new_node->page_hdr->is_leaf = judge_leaf; if (judge_leaf){ new_node->page_hdr->next_leaf = node->page_hdr->next_leaf; new_node->page_hdr->prev_leaf = node->GetPageNo (); node->page_hdr->next_leaf = new_node->GetPageNo (); IxNodeHandle* next_node = FetchNode (new_node->page_hdr->next_leaf); next_node->page_hdr->prev_leaf = new_node->GetPageNo (); buffer_pool_manager_->UnpinPage (next_node->GetPageId (), true ); } int mid = node->page_hdr->num_key / 2 ; new_node->insert_pairs (0 , node->get_key (mid), node->get_rid (mid), node->page_hdr->num_key - mid); node->page_hdr->num_key = mid; if (!judge_leaf){ for (int i = 0 ; i < new_node->page_hdr->num_key; i++){ maintain_child (new_node, i); } } return new_node; }
首先将原结点的键值对平均分配,右半部分分裂为新的右兄弟结点,对新节点的page_hdr
内容进行初始化。下面对节点是否为叶子节点进行讨论。
如果右兄弟结点是叶子结点,更新新旧节点的prev_leaf
和next_leaf
指针为新节点分配键值对,更新旧节点的键值对数记录。而非叶子节点没有前置叶节点和后置叶结点,所以可以略去。
如果新的右兄弟结点不是叶子结点,则其存在孩子节点,需要更新该结点的所有孩子结点的父节点信息。
InsertIntoParent函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void IxIndexHandle::InsertIntoParent (IxNodeHandle *old_node, const char *key, IxNodeHandle *new_node, Transaction *transaction) { if (old_node->IsRootPage ()){ IxNodeHandle *new_root = CreateNode (); new_root->page_hdr->is_leaf = 0 ; new_root->page_hdr->num_key = 0 ; new_root->page_hdr->parent = INVALID_PAGE_ID; new_root->page_hdr->next_free_page_no = IX_NO_PAGE; new_root->insert_pair (0 , old_node->get_key (0 ), (Rid){old_node->GetPageNo (), -1 }); new_root->insert_pair (1 , key, (Rid){new_node->GetPageNo (), -1 }); int new_root_page = new_root->GetPageNo (); file_hdr_.root_page = new_root_page; new_node->page_hdr->parent = new_root_page; old_node->page_hdr->parent = new_root_page; } else { IxNodeHandle *parent_node = FetchNode (old_node->page_hdr->parent); parent_node->insert_pair (parent_node->find_child (old_node) + 1 , key, (Rid){new_node->GetPageId ().page_no, -1 }); if (parent_node->page_hdr->num_key == parent_node->GetMaxSize ()){ IxNodeHandle *new_parent_node = Split (parent_node); InsertIntoParent (parent_node, new_parent_node->get_key (0 ), new_parent_node, transaction); buffer_pool_manager_->UnpinPage (new_parent_node->GetPageId (), true ); } buffer_pool_manager_->UnpinPage (parent_node->GetPageId (), true ); } }
首先判断旧节点是不是根节点,如果是根节点则分裂后需要指向一个新的根节点,并且需要进行根节点的初始化工作,B+树的深度加深一层。如果不是根节点则找到父节点进行插入,如果父节点也满了那就循环递归。
parent_node->insert_pair(parent_node->find_child(old_node) + 1, key, (Rid){new_node->GetPageId().page_no, -1});
此处是将新节点的0key插入父节点,需要找到父节点中旧节点的位置pos,pos+1
处才为新节点的位置。
任务2.3 B+树的删除 任务要求 (1)结点内的删除 1 2 3 4 5 class IxNodeHandle { void erase_pair (int pos) ; int Remove (const char *key) ; }
(2)B+树的删除 1 2 3 4 5 6 7 8 class IxIndexHandle { void delete_entry (const char *key, Transaction *transaction) ; bool CoalesceOrRedistribute (IxNodeHandle *node, Transaction *transaction) ; bool Coalesce (IxNodeHandle **neighbor_node, IxNodeHandle **node, IxNodeHandle **parent, int index, Transaction *transaction) ; void Redistribute (IxNodeHandle *neighbor_node, IxNodeHandle *node, IxNodeHandle *parent, int index) ; bool AdjustRoot (IxNodeHandle *old_root_node) ; }
通关截图
代码分析 (1)结点内的删除 ix_node_handle.cpp
erase_pair函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 void IxNodeHandle::erase_pair (int pos) { memmove (get_key (pos), get_key (pos + 1 ), (page_hdr->num_key - pos - 1 ) * file_hdr->col_len); memmove (get_rid (pos), get_rid (pos + 1 ), (page_hdr->num_key - pos - 1 ) * sizeof (Rid)); page_hdr->num_key --; }
pos+1
及后面的记录向前移动一个位置,覆盖掉pos处需要删除的记录
remove函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int IxNodeHandle::Remove (const char *key) { int pos = lower_bound (key); if (ix_compare (key, get_key (pos), file_hdr->col_type, file_hdr->col_len) == 0 ){ erase_pair (pos); return page_hdr->num_key; } else { return -1 ; } }
找到第一个大于等于的节点,判断是否相等。如果不相等则说明删除项不存在,返回-1。如果相等则调用erase_pair进行删除操作。
(2)B+树的删除 ix_index_handle.cpp
delete_entry函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 bool IxIndexHandle::delete_entry (const char *key, Transaction *transaction) { std::scoped_lock lock{root_latch_}; IxNodeHandle *leaf = FindLeafPage (key, Operation::DELETE, transaction); if (leaf->Remove (key) != -1 ){ CoalesceOrRedistribute (leaf, transaction); buffer_pool_manager_->UnpinPage (leaf->GetPageId (), true ); return true ; } else { buffer_pool_manager_->UnpinPage (leaf->GetPageId (), false ); return false ; } }
获取key值对应的叶节点,调用remove函数删除,如果删除失败,则返回false,删除成功就调用CoalesceOrRedistribute函数来进行合并或重分配操作,并返回true;
CoalesceOrRedistribute函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 bool IxIndexHandle::CoalesceOrRedistribute (IxNodeHandle *node, Transaction *transaction) { if (node->IsRootPage ()){ return AdjustRoot (node); } else { IxNodeHandle *parent_node = FetchNode (node->GetParentPageNo ()); int pos = parent_node->find_child (node); IxNodeHandle *brother_node = pos != 0 ? FetchNode (node->GetPrevLeaf ()) : FetchNode (node->GetNextLeaf ()); if (node->GetSize () + brother_node->GetSize () >= node->GetMinSize () * 2 ){ Redistribute (brother_node, node, parent_node, pos); buffer_pool_manager_->UnpinPage (parent_node->GetPageId (), true ); buffer_pool_manager_->UnpinPage (brother_node->GetPageId (), true ); return false ; } else { Coalesce (&brother_node, &node, &parent_node, pos, transaction); buffer_pool_manager_->UnpinPage (parent_node->GetPageId (), true ); buffer_pool_manager_->UnpinPage (brother_node->GetPageId (), true ); return true ; } } return false ; }
如果node节点是根节点,则调用adjustroot函数。如果不是根节点,获取父节点和兄弟节点。其中如果node处于0处,没有前置节点,则取后置节点,其余pos取前置节点。如果node结点和兄弟结点的键值对数量之和,能够支撑两个B+树结点,即数量之和大于双倍的最小节点数量大小,则只需重新分配即可。否则需要进行合并操作。
AdjustRoot函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 bool IxIndexHandle::AdjustRoot (IxNodeHandle *old_root_node) { if (old_root_node->IsLeafPage () == 0 && old_root_node->GetSize () == 1 ){ IxNodeHandle *new_root = FetchNode (old_root_node->ValueAt (0 )); new_root->SetParentPageNo (INVALID_PAGE_ID); file_hdr_.root_page = new_root->GetPageNo (); release_node_handle (*old_root_node); buffer_pool_manager_->UnpinPage (new_root->GetPageId (), true ); return true ; } else if (old_root_node->IsLeafPage () == 1 && old_root_node->GetSize () == 0 ){ file_hdr_.root_page = INVALID_PAGE_ID; release_node_handle (*old_root_node); return true ; } return false ; }
如果旧根节点是内部节点,并且删除后大小为1,由于其还有孩子节点,孩子节点接替旧根节点成为新的根节点。如果旧根节点是叶节点,并且删除后大小为0,那么整根树都为空,直接将root_page置空即可。
Redistribute函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void IxIndexHandle::Redistribute (IxNodeHandle *neighbor_node, IxNodeHandle *node, IxNodeHandle *parent, int index) { if (index == 0 ){ node->insert_pair (node->page_hdr->num_key, neighbor_node->get_key (0 ), *neighbor_node->get_rid (0 )); neighbor_node->erase_pair (0 ); maintain_child (node, node->page_hdr->num_key - 1 ); maintain_parent (neighbor_node); } else { int neighbor_pos = neighbor_node->page_hdr->num_key -1 ; node->insert_pair (0 , neighbor_node->get_key (neighbor_pos), *neighbor_node->get_rid (neighbor_pos)); neighbor_node->erase_pair (neighbor_pos); maintain_child (node, 0 ); maintain_parent (node); } }
根据兄弟节点为前驱后驱分成两种情况讨论。由于是node节点被删除,所以需要neighbor_node
让出多余的键值对给node节点。需要注意第page_hdr->num_key
个键值对位于page_hdr->num_key - 1
处。
如果兄弟节点是前驱节点,那么neighbor_node让出最后一个键值对给node,放在node的最前面。由于node节点第一个键值对发生变化,则需要使用maintain_parent函数。
如果兄弟节点是后驱节点,那么neighbor_node让出第一个键值对给node,放在node的最后。由于neighbor_node节点第一个键值对发生变化,则需要使用maintain_parent函数。
Coalesce函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 bool IxIndexHandle::Coalesce (IxNodeHandle **neighbor_node, IxNodeHandle **node, IxNodeHandle **parent, int index, Transaction *transaction) { if (index == 0 ){ std::swap (*neighbor_node, *node); index++; } int before_num = (*neighbor_node)->GetSize (); (*neighbor_node)->insert_pairs (before_num, (*node)->get_key (0 ), (*node)->get_rid (0 ), (*node)->GetSize ()); int after_num = (*neighbor_node)->GetSize (); for (int i = before_num; i < after_num; ++i) maintain_child (*neighbor_node, i); if ((*node)->GetPageNo () == file_hdr_.last_leaf){ file_hdr_.last_leaf = (*neighbor_node)->GetPageNo (); } erase_leaf (*node); release_node_handle (**node); (*parent)->erase_pair (index); return CoalesceOrRedistribute (*parent, transaction); }
任务2.4 B+树索引并发控制 任务要求 本任务要求修改IxIndexHandle
类的原实现逻辑,让其支持对B+树索引的并发 查找、插入、删除操作。
通关截图