leveldb学习记录-skiplist

leveldb中memtable的实现是采用skiplist跳表的,了解skiplist这种数据结构,便于我们分析理解memtable。

1.skiplist跳表的数据结构

SkipList是一种用来代替平衡树的数据结构。
虽然在最坏的情况下SkipList的效率要低于平衡树,但是大多数情况下效率仍然非常高,其插入、删除、查找的时间复杂度都是O(log(N))。
除了高效外,其实现和维护非常简单也是一大优势。SkipList的使用还是比较广泛的,比如在LevelDB中的MemTable就是使用SkipList实现的。

1
skiplist是多层的有序链表

查找

以查找19为例

image-20201221194025728

先从最上层的跳跃区间大的层开始查找。从头结点开始,首先和23进行比较,小于23,(此时查找指针在图中“1”位置处),查找指针到下一层继续查找。

然后和9进行判断,大于9,查找指针再往前走一步和23比较,小于23,(此时查找指针在图中“2”位置处) 此时这个值肯定在9结点和23结点之间。查找指针到下一层继续查找。

然后和13进行判断,大于13,查找指针再往前走一步和23比较,小于23,(此时查找指针在图中“3”位置处)此时这个值肯定在13结点和23结点之间。查找指针到下一层继续查找。此时,我们和19进行判断,找到了。

插入

包含以下几个操作:

1、查找到需要插入的位置

2、申请新的节点

3、调整指针

因为在找到插入点之后,新生成节点,新节点按概率出现在每层上,故需要保存所有层的后续指针,这里用一个临时数组保存所有层的插入点处的后续指针。

image-20201221194105759

以上图为例,现在要插入15这个节点。
其过程和查找类似,唯一的问题是,前面的节点的指针是如何保留下来的?

我们可以看到插入结束后,13的level=0的指针指向了15,13的level=1的指针指向了15。
这就意味着,在插入的时候我们就需要保留13的level=0的指针和13的level=1的指针。

在SkipList中是这样做的,有一个update数组,这个数组的大小为maxLevel。
还是以上图为例,在15插入之前,update这个数组中就已经存储了当前每个level的指针。

  • update[0]:13 level=0
  • update[1]:13 level=1
  • update[2]:9 level=2
  • update[3]:null level=3

skiplist的删除

删除的逻辑和插入类似

1、查找到相应的节点

2、通过update数组来实现该节点的逻辑删除

3、回收该节点资源

2.skiplist类介绍

skiplist是一个模板类

1
2
template <typename Key, class Comparator>
class SkipList

其中key为要存储的数据类型,Comparator实现key之间的比较。

skiplist的完整定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
template <typename Key, class Comparator>
class SkipList {
private:
struct Node;

public:
// Create a new SkipList object that will use "cmp" for comparing keys,
// and will allocate memory using "*arena". Objects allocated in the arena
// must remain allocated for the lifetime of the skiplist object.
explicit SkipList(Comparator cmp, Arena* arena);

SkipList(const SkipList&) = delete;
SkipList& operator=(const SkipList&) = delete;

// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key& key);

// Returns true iff an entry that compares equal to key is in the list.
bool Contains(const Key& key) const;

// Iteration over the contents of a skip list
class Iterator {
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(const SkipList* list);

// Returns true iff the iterator is positioned at a valid node.
bool Valid() const;

// Returns the key at the current position.
// REQUIRES: Valid()
const Key& key() const;

// Advances to the next position.
// REQUIRES: Valid()
void Next();

// Advances to the previous position.
// REQUIRES: Valid()
void Prev();

// Advance to the first entry with a key >= target
void Seek(const Key& target);

// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();

// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast();

private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};
private:
enum { kMaxHeight = 12 };

inline int GetMaxHeight() const {
return max_height_.load(std::memory_order_relaxed);
}

Node* NewNode(const Key& key, int height);
int RandomHeight();
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

// Return true if key is greater than the data stored in "n"
bool KeyIsAfterNode(const Key& key, Node* n) const;

// Return the earliest node that comes at or after key.
// Return nullptr if there is no such node.
//
// If prev is non-null, fills prev[level] with pointer to previous
// node at "level" for every level in [0..max_height_-1].
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

// Return the latest node with a key < key.
// Return head_ if there is no such node.
Node* FindLessThan(const Key& key) const;

// Return the last node in the list.
// Return head_ if list is empty.
Node* FindLast() const;

// Immutable after construction
Comparator const compare_;
Arena* const arena_; // Arena used for allocations of nodes

Node* const head_;

// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
std::atomic<int> max_height_; // Height of the entire list

// Read/written only by Insert().
Random rnd_;
};

由skiplist定义,发现其只提供了InsertContains两个接口。

即插入和查找。Contains用于判断是否在list中存在与key值相等的entry(即一条key-value记录)。没有提供Delete接口,因为在leveldb中删除操作就是插入一个标记某个kv对为删除的节点。

3.insert操作

insert和contains是skiplist比较核心的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template <typename Key, class Comparator>
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
//leveldb跳表要求不能插入重复数据
assert(x == nullptr || !Equal(key, x->key));

int height = RandomHeight(); //随机获取一个level值
if (height > GetMaxHeight()) { //GetMaxHeight为获取跳表当前高度
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
// 此处不用为并发读加锁。因为并发读在(在另外线程中通过 FindGreaterOrEqual 中的 GetMaxHeight)
// 读取到更新后跳表层数,但该节点尚未插入时也无妨。因为这意味着它会读到 nullptr,而在 LevelDB
// 的 Comparator 设定中,nullptr 比所有 key 都大。因此,FindGreaterOrEqual 会继续往下找,
// 符合预期。
max_height_.store(height, std::memory_order_relaxed);
}

x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
// 此句 NoBarrier_SetNext() 版本就够用了,因为后续 prev[i]->SetNext(i, x) 语句会进行强制同步。
// 并且为了保证并发读的正确性,一定要先设置本节点指针,再设置原条表中节点(prev)指针
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}

采用FindGreaterOrEqual函数确定待插入的位置,采用prev记录每一层最大一个< key的节点,也就是待插入节点的前驱节点。

通过RandomHeight随机生成此次插入节点的高度,

插入新节点。

FindGreaterOrEqual函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) {//如果next->key < key
// Keep searching in this list
x = next;
} else {//如果next->key >= key,如果单纯为了判断是否相等,这里可以加一个判断直接返回了,没必要level--到0再返回
if (prev != nullptr) prev[level] = x;//prev记录该level最后一个<key的节点
if (level == 0) {//到达最底层则返回next (next是第一个>=key的节点)
return next;
} else {
// Switch to next list
level--;
}
}
}
}

FindGreaterOrEqual 查找并返回第一个大于等于 key 的节点。如果是查找后需要进行插入,需要记录下这个节点的 prev 指针。

插入的高度值是怎么确定的呢?

1
int RandomHeight();//通过抛硬币的方法(1/4概率)决定高度值,范围[1, kMaxHeight]

Contains函数

1
2
3
4
5
6
7
8
9
10
11
12
13
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
//x记录第一个>= key的Node
//注意FindGreaterOrEqual是查找>=key的Node,因此会迭代直到level = 0才返回
//实际上可以实现一个接口直接查找==key的Node,这样会在level >=0 时就能返回,查找的时间复杂度不变,不过可以预见减少比较次数。
Node* x = FindGreaterOrEqual(key, nullptr);
//判断x->key == key
if (x != nullptr && Equal(key, x->key)) {
return true;
} else {
return false;
}
}

这个函数是通过FindGreaterOrEqual找到节点,如果存在该节点,且key值相同,就return true.Contains 查找 SkipList 是否包含某个 key.

FindLessThan和FindLast

Contains Insert都用到了FindGreaterOrEqual,leveldb 还提供了:

  1. FindLessThan:查找最大的一个< key的节点,如果不存在返回head_
  2. FindLast:查找最后一个节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;//获取当前最大height
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == nullptr || compare_(next->key, key) >= 0) {//如果key >= next->key
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
x = next;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (next == nullptr) {//到达最后一个节点
if (level == 0) {//level=0,则返回
return x;
} else {
// Switch to next list
// level>0则到下一层
level--;
}
} else {
//不是最后一个节点则继续递增
x = next;
}
}
}

4.iterator

memtable在读取时采用的是SkipList::Iterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Iteration over the contents of a skip list
class Iterator {
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(const SkipList* list);

// Returns true iff the iterator is positioned at a valid node.
bool Valid() const;

// Returns the key at the current position.
// REQUIRES: Valid()
const Key& key() const;

// Advances to the next position.
// REQUIRES: Valid()
void Next();

// Advances to the previous position.
// REQUIRES: Valid()
void Prev();

// Advance to the first entry with a key >= target
void Seek(const Key& target);

// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();

// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToLast();

private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};

分析源码,发现此迭代器接口跟普通迭代器基本一样,内部私有成员包含一个Skiplist和一个node。接口实现都较为简单,有的是直接调用上面提到的FindGreaterOrEqualFindLessThanFindLast

5.总结

randomHeight,之前介绍抛硬币的方法概率为1/2,这里使用的是1/4,结合kBranching = 4kMaxHeight = 12,不影响复杂度的情况下,可以最多支持4^11 = 4194304个节点,因此在百万节点左右,这么设置参数效果最优。