leveldb学习记录-compaction从memtable到sstable

前面讲了SSTable文件,这种文件是leveldb的最终形态、落盘文件,在该文件中,key-value是有序的,它位于磁盘上。很明显客户输入的key-value对的顺序是不可预知的,是无序,因此SSTable肯定不是直接对应用户的put key-value操作。

直接应对客户put操作的数据结构是MemTable,它是一个存在在内存中的数据结构,内部也是有序的,使用SkipList来组织用户输入的key-value对。(为了防范异常掉电,所以引入了log文件)。

这里面就存在一个问题了,用户的输入的数据暂时记录在MemTable,而最终落在磁盘上的是SSTable文件。那问题就来了:

  • 怎么将内存中的SkipList组织的MemTable中的数据dump到SSTable file,
  • 什么时机做这个dump 操作。

这就牵扯到LevelDB中很重要的一个概念,Compaction

==LevelDB中,Compaction分成两种:==

  • minor compaction
  • major compaction

minor compaction涉及到将Memtable中的key-value对dump到磁盘的sstable文件。在合适的时候,将Immutable MemTable dump到磁盘,形成SSTable。

而major compaction涉及的是磁盘中各个level之间的sstable文件的compaction。

1.minor compaction整体流程

入口是 DBImpl::MaybeScheduleCompaction()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
// 防止无限递归,会判断需不需要再次compaction,如果不需要,递归结束
} else {
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}

void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}

void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
BackgroundCompaction();//关键部分 BackgroundCompaction
}

background_compaction_scheduled_ = false;

// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
background_work_finished_signal_.SignalAll();
}

整体流程图

调用

猛然一看,是一个无限递归函数,从MaybeScheduleCompaction进入,但是后面调用的函数再次调用了MaybeScheduleCompaction函数。但其实在最后if有一个判断,imm_ == NULL && manual_compaction_ == NULL &&!versions_->NeedsCompaction();不会无限递归。

  • imm_ == NULL表示没有Immutable MemTable需要dump成SST
  • manual_compaction_ == NULL 表示不是手动调用DBImpl::CompactRange,没有人工触发
  • versions_->NeedsCompaction 来 判断是是否需要进一步发起Compaction
1
2
3
4
5
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}

BackGroundCall()函数中的注释

1
2
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.

意思是第一轮的compaction可能会产生出很多files,需要再发起一次compaction。而是否需要就通过versions_->NeedsCompaction来判断。

下面来看看真正compaction的函数:BackgroundCompaction

触发条件:

1
2
3
4
5
6
7
8
9
10
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();//如果当前线程持有互斥锁,则不执行任何操作

if (imm_ != NULL) {
CompactMemTable();//minor compaction,//将imm_写到level 0
return;
}

...
}
CompactMemTable函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void DBImpl::CompactMemTable() {
mutex_.AssertHeld();
assert(imm_ != nullptr);

// Save the contents of the memtable as a new Table 将memtable的内容另存为新表
VersionEdit edit;
Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base); //将数据写到L0
base->Unref();

if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}

// Replace immutable memtable with the generated Table
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_); //早期的log不再需要,将生成的Version Edit到当前VersionSet中
}

if (s.ok()) {
// Commit to the new state
imm_->Unref(); //计数-1,引用计数为0时会delete当前imm_
imm_ = nullptr;
has_imm_.store(false, std::memory_order_release);
RemoveObsoleteFiles();
} else {
RecordBackgroundError(s);
}
}

这里引用其他博客里的一张调用流程图如下:

调用流程图

WriteLevel0Table函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long)meta.number);

Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//将imm dump到sstable中
mutex_.Lock();
}

Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long)meta.number, (unsigned long long)meta.file_size,
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.number);

// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); //选择sstable写到哪个level
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest); //添加本次sstable的Filemeta 生成VersionEdit,给manifest文件记录。
}

CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);
return s;
}

这个函数主要包括三个部分,

  • BuildTable:将imm dump到磁盘的sstable中
  • PickLevelForMemTableOutput:根据最小key和最大key计算新生成的sstable应该位于哪一层
  • edit->AddFile:AddFile**是记录{level, FileMetaData}对到new_files_(一个vector)中。

其中最重要的BuildTable就是将immutable memtable的内容写到sstable 文件中,sstable文件的命令是由一些列数组+.ldb构成的

sstable

这个数字是由VersionSet分配的,meta->number的赋值语句在DBImpl::WriteLevel0Table中的 meta.number = versions_->NewFileNumber();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 meta.number = versions_->NewFileNumber();


class VersionSet {
public:
VersionSet(const std::string& dbname,
const Options* options,
TableCache* table_cache,
const InternalKeyComparator*);
~VersionSet();
...

// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_++; }
...

}
BuildTable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst();

std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
WritableFile* file;
s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}

TableBuilder* builder = new TableBuilder(options, file); //新建sstable
meta->smallest.DecodeFrom(iter->key()); //第一个key是最小的
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value()); //向sstable添加key
}
if (!key.empty()) {
meta->largest.DecodeFrom(key); //最后一个key是最大的
}

// Finish and check for builder errors
s = builder->Finish(); //写入sstable的其他block,filter block、metaindex block、index block
if (s.ok()) {
meta->file_size = builder->FileSize();
assert(meta->file_size > 0);
}
delete builder;

// Finish and check for file errors
if (s.ok()) {
s = file->Sync(); //写入
}
if (s.ok()) {
s = file->Close();
}
delete file;
file = nullptr;

if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(), meta->number,
meta->file_size);
s = it->status();
delete it;
}
}

// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}

if (s.ok() && meta->file_size > 0) {
// Keep it
} else {
env->RemoveFile(fname);
}
return s;
}

BuildTable函数实现位于db/build.cc中,该函数产生了SSTable文件。首先会新建一个sstable文件,然后根据iter,向sstable添加数据,然后写入磁盘中。

PickLevelForMemTableOutput

新产生出来的sstable 并不一定总是处于level 0, 尽管大多数情况下,处于level 0。新创建的出来的sstable文件应该位于那一层呢? 由PickLevelForMemTableOutput 函数来计算。

从策略上要尽量将新compact的文件push高level,毕竟在level 0 需要控制文件过多,compaction IO和查找都比较耗费,另一方面也不能放到过高的level,一定程度上控制查找的次数,而且若某些范围的key更新比较频繁,后续往高层compaction IO消耗也很大,读性能会降低。 所以PickLevelForMemTableOutput就是个权衡折中。

如果新生成的sstable和Level 0的sstable有交叠,那么新产生的sstable就直接加入level 0,否则根据一定的策略,向上推到Level1 甚至是Level 2,但是最高推到Level2,这里有一个控制参数:kMaxMemCompactLevel

1
2
3
4
5
6
7
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the
// relatively expensive level 0=>1 compactions and to avoid some
// expensive manifest file operations. We do not push all the way to
// the largest level since that can generate a lot of wasted disk
// space if the same key space is being repeatedly overwritten.
static const int kMaxMemCompactLevel = 2;

由kMaxMemCompactLevel定义来看,sstable最多被放到level2。

现在来看看PickLevelForMemTableOutput函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) { //如果sstable文件与L0有重叠,直接加入L0
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel) {//最高判断到L2 while 循环寻找合适的level层级,最大level为2,不能更大
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
if (level + 2 < config::kNumLevels) { //kNumLevels=7
// Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
break;
}
}
level++;
}
}
return level;
}

程序流程图:

流程图

有一点,是如何判断和不同的level的sstable文件有没有重叠的呢?主要有以下原因:

  • SSTable中的key-pair是有序的,给我一个最小的key和一个最大的key,就足以描述该文件中key的范围
  • 数据结构 FileMetaData,描述了各个文件的名字,最小key 最大key,文件的大小等信息。
edit->AddFile
1
2
3
4
5
6
7
8
9
10
11
12
// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}

从 edit->AddFile 可知,一个 SSTable 对应有一个 FileMeta 存放在 edit 中,edit 最终会存放在 manifest,同时 edit +原始的version最终会演变成新的 version,version 又会加入到 versioneset 中。

2.何时触发compaction呢?

看下上面提到的MaybeScheduleCompaction函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
// No work to be done
// 防止无限递归,会判断需不需要再次compaction,如果不需要,递归结束
} else {
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
  • 当imm_ != NULL表示有Immutable MemTable需要dump成SST
  • manual_compaction_ != NULL 表示会手动调用DBImpl::CompactRange,人工触发compaction
  • versions_->NeedsCompaction 来 判断是是否需要进一步发起Compaction

触发条件:来看NeedsCompaction ()函数,这里又分了两种情况,文件数目过多或者某层级文件总大小过大, 引起compaction

1
2
3
4
5
// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}

先看看v->compaction_score_,

2.1 文件数目过多或者某层级文件总大小过大,引起compacction

compaction_score_和Finalize函数

那么什么情况下会重新计算compaction_score_呢,利用VersionSet的Finalize函数,会遍历各个level的文件数目和该level所有文件的总大小,给各个level打个分,如果没有一个level的分数是大于等于1,表示任何一个层级都不需要Compaction,但是如果存在某个或者某几个层级的score大于等于1,选择分最高的那个level。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;

for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}

if (score > best_score) {
best_level = level;
best_score = score;
}
}

v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}

这里需要注意,level0是根据文件数目来计算score,而其他层级(level 1~level 6)是根据该层级所有文件的总大小来计算score

对于level 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
score =  (level 0 文件总数目) / config::kL0_CompactionTrigger

static const int kL0_CompactionTrigger = 4;// Level-0 compaction is started when we hit this many files.

if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
}

对于 level 1~ level 6,则是根据当前文件的大小与该level大小的理论上限的比值

1
2
3
4
5
6
7
8
score = (该level 所有文件的总大小)/ (该level的大小的理论上限:MaxBytesForLevel)

else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}
1
2
3
4
5
6
7
8
9
10
11
12
static double MaxBytesForLevel(const Options* options, int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.

// Result for both level-0 and level-1
double result = 10. * 1048576.0; //默认是10M
while (level > 1) {
result *= 10;
level--;
}
return result;
}

这里说一下为什么L0跟其他level不一样

看到一篇博客的解释:

注释说的很明白,level 0的文件之间,key可能是交叉重叠的,因此不希望level 0的文件数特别多。我们考虑write buffer 比较小的时候,如果使用size来限制,那么level 0的文件数可能太多。

另一个方面,如果write buffer过大,使用固定大小的size 来限制level 0的话,可能算出来的level 0的文件数又太少,触发 level 0 compaction的情况发生的又太频繁。所以L0根据文件数目来计算score,进行compaction。默认L0的文件数量为4

1
2
// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;

L1-L6中各层之间的T=10,也就是下一层的大小是上一层的10倍。比如L1=10M,L2=100M, L3=1000M,

1
2
3
4
5
6
level 1               10M 
level 2 100M
level 3 1000M
level 4 10000M
level 5 100000M
level 6 1000000M

其会选择score高的level来做compaction

1
2
3
4
if (score > best_score) {
best_level = level;
best_score = score;
}

得分越高,说明该层级触发compaction的要求就越迫切, v->compaction_level_ 就会设置成得分最高的那个层级。

所以当level0文件数过多,或者L1-L6的总file size过大时,会触发compaction。

2.2 seek次数太多,触发compaction

file_to_compact_ & Seek Compaction

随着时间的流逝,LevelDB各个层级都有多个文件。剔除level 0不论,对于任何一个层级来说,层级的内的任意一个文件本身是有序的,而位于同一层级的内部的多个文件,他们也是有序的,而且key是不交叉的。

但是很不幸的是,level n 和level n+1的文件,key的范围可能交叉,这种交叉,就可能带来 seek miss,即数据有可能位于level n的某个文件中(根据该文件的最小key和最大key和用户要查找的key来推算),但是实际情况是并不在level n的该文件中,不得不去level n+1的文件查找。这种seek miss不解决,就会造成查询效率的下降。

除了level 0以外,任何一个level的文件内部是有序的,文件之间也是有序的。但是level(n)和level(n+1)中的两个文件的key可能存在交叉。正是因为这种交叉,查找某个key值的时候,level(n) 的查找无功而返,而不得不去level(n+1)查找。如果查找了多次,某个文件不得不查找,却总也找不到,总是去高一级的level,才能找到。这说明该层级的文件和上一级的文件,key的范围重叠的很严重,这是不合理的,会导致效率的下降。因此,需要对该level 发起一次major compaction,减少 level 和level + 1的重叠情况。

这就是所谓的 Seek Compaction。对于seek触发的compaction, 哪个文件无效seek的次数到了阈值,那个文件就是level n的参与compaction的文件。而size 触发的compaction稍微复杂一点,它需要考虑上一次compaction做到了哪个key,什么地方,然后大于该key的第一个文件即为level n的参与compaction的文件。

对于n >0的情况,初选情况下level n的参与compaction文件只会有1个,如果n=0,因为level 0的文件之间,key可能交叉重叠,因此,根据选定的level 0的该文件,得到该文件负责的最小key和最大key,找到所有和这个key 区间有交叠的level 0文件,都加入到参战文件。

LevelDb在选定某个level进行compaction后,还要选择是具体哪个文件要进行compaction,比如这次是文件A进行compaction,那么下次就是在key range上紧挨着文件A的文件B进行compaction,这样每个文件都会有机会轮流和高层的level 文件进行合并。

如果选好了level L的文件A和level L+1层的文件进行合并,那么问题又来了,应该选择level L+1哪些文件进行合并?levelDb选择L+1层中和文件A在key range上有重叠的所有文件来和文件A进行合并。也就是说,选定了level L的文件A,之后在level L+1中找到了所有需要合并的文件B,C,D…..等等。剩下的问题就是具体是如何进行major 合并的?就是说给定了一系列文件,每个文件内部是key有序的,如何对这些文件进行合并,使得新生成的文件仍然Key有序,同时抛掉哪些不再有价值的KV 数据。

3.compaction的实现

compaction的实现部分位于DBImpl::BackgroundCompaction()函数,包括了minor compaction和major compaciton。

3.1 PickCompaction()

代码首先判断是否需要CompactMemTable(),判断imm_是否为空,不为空则执行CompactMemTable(),minor compaction操作

1
2
3
4
if (imm_ != nullptr) {   //如果有转换的memtable,直接将memtable写入sstable即返回
CompactMemTable(); //将imm_写到level 0
return;
}

如果immutable 不存在,则merge各层leveldb的sstable,也就是major compaction,可以知道,minor compaction的优先级高于major compaction。

然后就是执行major compaction,一般数据库的compaction操作都为自动触发,不使用手动触发,所以这里直接认为执行else{…}中PickCompaction()挑选合适参与compaction的所有sstable。保存在C中

1
2
3
4
5
6
7
8
Compaction* c;
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
if (is_manual) { //用户(主动)手动触发的compaction
XXXXXXX
} else {
c = versions_->PickCompaction(); //找出最合适compaction的level,自动触发
}

接下来我们来看看PickCompaction()函数。Compaction* VersionSet::PickCompaction()简言来说,就是选取一层需要compaction的sstable文件列表,及相关的下层sstable文件列表,记录在Compaction*返回。

上面我们提到major compaction除了包括手动触发的compaction外,还包括文件数目过多或某个level文件总大小过大触发compaction,也就是size compaction;和seek次数过多,触发的compaction,也就是seek compaction。

源码部分,首先根据size_compaction和seek_compaction计算待compaction的文件。

当compaction_score_>=1时,触发size compaction。首先是找到该层第一个满足条件的sstable,// Pick the first file that comes after compact_pointer_[level]

std::string compact_pointer_[config::kNumLevels];compact_pointer_定义的是string类型,记录了该层上次compaction时文件的最大key值, 定义compact_pointer_的初始值为空,也就是选择该层的第一个sstable文件。

此时input_[0]中有且仅有一个文件,还需要在L0中找到还有哪些文件与初始的一个文件有重叠,把这些文件都加进来,再通过GetRange获得输入文件的key range,得到最小key和最大key,也就是文件的范围。经过这一步之后,其实是要满足等待compaction的sst文件在各个level都满足这一条件:inputs_[0]中的sst文件跟该层的其他文件之间没有overlap(key重叠)。

// inputs_[0]为 level-n 的 sstable 文件信息,

// inputs_[1]为 level-n+1 的 sstable 文件信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;

// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != nullptr);
if (size_compaction) { /*size compaction是第一种情况,根据size来决定是否发起compaction,从which层级发起compaction*/
level = current_->compaction_level_;
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
c = new Compaction(options_, level);

// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) { // /*seek_compaction 是第二种情况,无效seek 次数太多,所以依据文件以及其所属层级发起compaction*/
level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return nullptr;
}

c->input_version_ = current_;
c->input_version_->Ref();

// Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty());
}

SetupOtherInputs(c); //负责计算level n+1中参与compaction的文件

return c;
}

3.2 SetupOtherInputs()

计算下一层参与compaction的sst,得到input_[1]。

在level n+1中,所有与level n得到的key的range中(smallest,largest)有重叠的sst文件,都要加进input_[1]中参与compaction。得到这些sst文件后,还要注意能否在不增加level n+1层文件的前提下,增加leveldb n层的文件。

也就是尽肯能增加level n层的文件,类似贪心算法。

根据源码来理解下SetupOtherInputs()函数

首先是计算下一层与inputs_[0] key range 有重叠的所有 sstable files,记录到inputs_[1]

1
2
3
4
5
6
7
8
9
10
11
12
13
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
//inputs_[0]所有文件的key range -> [smallest, largest]
GetRange(c->inputs_[0], &smallest, &largest);

//inputs_[1]记录level + 1层所有与inputs_[0]有overlap的文件
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);

// Get entire range covered by compaction
InternalKey all_start, all_limit;
//inputs_[0, 1]两层所有文件的key range -> [all_start, all_limit]
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

根据inputs_[1]反推下 level 层有多少 key range 有重叠的文件,记录到expanded0

1
2
3
4
5
6
7
8
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
// 如果再不增加level + 1层文件的情况下,尽可能的增加level层的文件
if (!c->inputs_[1].empty()) {
std::vector<FileMetaData*> expanded0;
//level层与[all_start, all_limit]有overlap的所有文件,记录到expanded0
//expanded0 >= inputs_[0]
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);

如果文件确实又增加,同时又不会增加太多文件(太多会导致 compact 压力过大)

1
2
3
4
5
6
7
8
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
//1. level 层参与compact文件数有增加
//2. 但合并的文件总量在ExpandedCompactionByteSizeLimit之内(防止compact过多)
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {

那么就增加参与 compact 的文件,更新到inputs_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
InternalKey new_start, new_limit;
//[new_start, new_limit]记录expand0的key range
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
//如果level层文件从inputs_[0]扩展到expand0,key的范围变成[new_start, new_limit]
//看下level + 1层overlap的文件范围,记录到expand1
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
&expanded1);
//确保level + 1层文件没有增加,那么使用心得expand0, expand1
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level,
int(c->inputs_[0].size()),
int(c->inputs_[1].size()),
long(inputs0_size), long(inputs1_size),
int(expanded0.size()),
int(expanded1.size()),
long(expanded0_size), long(inputs1_size));
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
}

注意上述的判断逻辑,

1、根据level n的input[0]所确定的key的范围确定level n+1层的input[1]。

2、再根据input[1]反过来看能否增大input[0]

3、将增加sst后的input[0]记录到expanded0中

4、根据expanded0确定的key的range判断是否会导致input[1]增大

5、如果input1[1]还是原来的大小,则可以扩大参与compaction的level n的文件范围。

简单说,就是在不增加level n+1层文件,同时不会导致level n和level n+1参与compaction的总文件过大的前提下,尽量增加level n层的文件数。

至此,参与compaction的level n和level n+1的文件集合就已经确定了,为了避免这些文件合并到 level n+1 层后,跟 level n+2 层有重叠的文件太多,届时合并 level n+1 和 level n+2 层压力太大,因此我们还需要记录下 level n+2 层的文件,后续 compaction 时用于提前结束的判断:

1
2
3
4
5
6
7
8
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
// level + 2层有overlap的文件,记录到c->grandparents_
if (level + 2 < config::kNumLevels) {
//level + 2层overlap的文件记录到c->grandparents_
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}

然后记录compact_pointer_c->edit_,在后续PickCompaction入口时使用

1
2
3
4
5
6
7
  // Update the place where we will do the next compaction for this level.
// We update this immediately instead of waiting for the VersionEdit
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest); //记录下Compaction_pointer_,对于size compaction, 下一次要靠该值来选择 level n的参与compaction的文件
}

PickCompaction()函数的最后就是返回挑选进行compaction的文件的结果c

1
2
3
4
5
6
//选取一层需要compact的文件列表,及相关的下层文件列表,记录在Compaction*
Compaction* VersionSet::PickCompaction() {
Compaction* c;
...
return c;
}

返回Compaction * c后,紧接着有一个判断,什么情况下可以直接使用原sst文件,直接向下merge,省去了重新生成文件的过程。这里关注一下IsTrivialMove函数。

3.3 IsTrivialMove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
bool Compaction::IsTrivialMove() const {
const VersionSet* vset = input_version_->vset_;
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
// 同时满足以下条件时,我们只要简单的把文件从level标记到level + 1层就可以了
// 1. level层只有一个文件
// 2. level + 1层没有文件
// 3. 跟level + 2层overlap的文件没有超过25M
// 注:条件三主要是(避免move到level + 1后,导致level + 1 与 level + 2层compact压力过大)
return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <=
MaxGrandParentOverlapBytes(vset->options_));
}

当level 与level +1层文件没有overlap,直接merge或者叫move到下一level并不会导致错误,这样就节省了compaction重新生成文件的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Status status;
if (c == nullptr) { //如果c为空,说明没有文件需要进行compaction,无事可做了
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
//如果不是手动触发的,并且level中的输入文件与level+1中无重叠,且与level + 2中重叠不大于
//kMaxGrandParentOverlapBytes = 10 * kTargetFileSize,直接将文件移到level+1中,并删除level层的文件
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_); //写入version中,稍后分析
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->LevelSummary(&tmp));
}

阅读上面的代码可以看出,如果满足c->IsTrivialMove()的条件,就只需要在edit中记录一下,

1
2
3
c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest);

然后再通过LogAndApply生效就ok了。LogAndApply主要做了以下几件事:

  1. edit应用于current_生成一个新的Version
  2. 计算新Version下,下次 major compaction 的文件
  3. 更新一些元信息管理文件
  4. 将新Version添加到双向链表,current_ = 新Version

所以,其实并没有操作sst文件,只是修改了文件对应的level。

如果pickcompaction得到的待compaction文件 compaction* c不满足以上if条件判断的内容,就直接调用实际的compaction过程,先读,再合并排序,然后写入一个新的sst到下一层level。

1
2
3
4
5
6
7
8
9
10
else {//调用实际的compaction过程
CompactionState* compact = new CompactionState(c); //c中包含需要compaction的文件的元信息
status = DoCompactionWork(compact); //否则调用DoCompactionWork进行compact输出文件
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact); //清理compaction过程中的临时变量
c->ReleaseInputs(); //清楚输入文件描述符
RemoveObsoleteFiles(); //删除无引用的文件,回收磁盘空间
}

现在,来关注以下DoCompactionWork()函数

3.4 DoCompactionWork()

Status DBImpl::DoCompactionWork(CompactionState* compact)真正的compaction,也就是多路归并过程,生成新的版本,其中compact里保存compaction的状态。

首先一个if判断来取当前最小的使用中的SequenceNumber

然后参与Compaction的sstable组成一个迭代器input。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions

Log(options_.info_log, "Compacting %d@%d + %d@%d files", //记录当前层文件数,层编号,下一层文件数,层编号
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);

assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(compact->outfile == nullptr);
if (snapshots_.empty()) { //将snapshot相关的内容记录到compact信息中
compact->smallest_snapshot = versions_->LastSequence();
} else {
compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
}
//生成iterator,遍历所有要compaction的文件
Iterator* input = versions_->MakeInputIterator(compact->compaction);

接着,先将input定位到首位置,开启遍历。

循环判断 如果有memtable需要compaction,先compact memtable,
依次通过上面的迭代器iterator遍历所有参与compaction的文件的所有key,
循环的主体工作是判断当前迭代器对应的key是否应该加入到新合并生成的文件中。

这里要注意一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//在做真正的compaction work时不应该阻塞DB的读写操作
// Release mutex while we're actually doing the compaction work 在实际执行compaction时释放互斥锁
mutex_.Unlock();

input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
//循环判断 如果有memtable需要compact,先compact memtable
//依次通过上面的迭代器iterator遍历所有参与compaction的文件的所有key
//循环的主体工作是判断当前迭代器对应的key是否应该加入到新合并生成的文件中
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { //每个input对应的是一个K/V
// Prioritize immutable compaction work 优先immutable memtable的compact操作
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
CompactMemTable(); //将imm_写入磁盘中
// Wake up MakeRoomForWrite() if necessary.
background_work_finished_signal_.SignalAll(); //wakeup等待空间的线程
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}

Slice key = input->key();
//判断是否需要停止compaction,中途输出compaction的结果,避免compaction结果和level N+2 files有过多的重叠
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) { ////与level + 2层的文件比较,如果目前的compact已经会导致后续level + 1 与 level + 2 compact压力过大
//那么结束本次compact
status = FinishCompactionOutputFile(compact, input); //写当前文件到磁盘
if (!status.ok()) {
break;
}
} //这里先把迭代器对应的key提取出来,因为在此之前我们可能以及遍历过多个key-value了,
//也就是可能已经将多个key-value写入到新的sstable中了。这里通过ShouldStopBefore函数判断是否符合生成一个新的sstable的条件,
//如果符合的话就将这个sstable写盘,如果不符合的话,就继续往里面加key-value。

这里要注意一下这个判断,其中的ShouldStopBefore()判断生成的SSTable和level + 2层的有重叠overlap的文件个数,如果超过10个,那么这个SSTable生成就完成了,这样保证了新产生的SSTable和上一层不会有过多的重叠。

1
2
3
4
5
6
7
8
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) { ////与level + 2层的文件比较,如果目前的compact已经会导致后续level + 1 与 level + 2 compact压力过大
//那么结束本次compact
status = FinishCompactionOutputFile(compact, input); //写当前文件到磁盘
if (!status.ok()) {
break;
}
}

接着定义了一个bool型变量drop=false,用来指示是否该key是否需要丢弃。会将起一次出现的key的SequenceNumber设置为最大,可以保证第一个key不会被丢弃,如果上一个key的SequenceNumber<=最小存活的snapshot的sequencenumber,当遍历到下一个key时,其SequenceNumber一定小于该最小存活的snapshot的sequencenumber,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
else {
if (!has_current_user_key || //如果第一次碰到一个user key
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// First occurrence of this user key 第一次出现的key,将seq设置为最大标记
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
//因为第一次出现会将last seq设置为最大, 表示上一个key的关于seqnumber的比较结果
// 如果上一个Key的SequenceNumber <= 最小的存活的Snapshot,那么
// 这个Key的SequenceNumber一定 < 最小的存活的Snapshot,那么这个Key就不
// 会被任何线程看到了,可以被丢弃,上面碰到了第一个User Key时,设置了
// last_sequence_for_key = kMaxSequenceNumber;保证第一个Key一定不会
// 被丢弃。
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// 如果碰到了一个删除操作,并且SequenceNumber <= 最小的Snapshot,
// 通过IsBaseLevelForKey判断更高的Level不会有这个User Key存在,那么这个Key就被丢弃
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}

last_sequence_for_key = ikey.sequence;
}

ParseInternalKeyInternalKey是有所关联的,ParsedInternalKey是解析后的InternalKey

如果drop为false,表示没有被丢弃,就添加此key,一旦达到文件大小就实际写入文件,生成新文件。然后input->next,遍历下一个key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (!drop) {
// Open output file if necessary 打开输出文件
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value()); //没有被丢弃就添加key

// Close output file if it is big enough // 达到文件大小,就写入文件,生成新文件
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) { //当前结果超过输出阈值2M
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}
input->Next(); //遍历下一个KV对

DBImpl::DoCompactionWork构造了一个迭代器,开始多路归并的操作,会考虑以下几点:

1、迭代按照Internal Key的顺序进行,多个连续的Internal Key里面可能包含相同的User Key,按照SequenceNumber降序排列;

2、相同的User Key里只有第一个User Key是有效的,因为它的SequenceNumber是最大的,覆盖了旧的User Key,但是无法只保留第一个User Key,因为LevelDB支持多版本,旧的User Key可能依然有线程可以引用,但是不再引用的User Key可以安全的删除;

3、碰到一个删除时,并且它的SequenceNumber <= 最新的Snapshot,会判断更高Level是否有这个User Key存在。如果存在,那么无法丢弃这个删除操作,因为一旦丢弃了,更高Level原被删除的User Key又可见了。如果不存在,那么可以安全的丢弃这个删除操作,这个键就找不到了;最开始看到这个地方的时候有些不太理解,考虑到如何更高level存在的user key如果也是表示为删除的也就是deletion的,就不会出现上述的问题呀,也就是可以直接丢弃这个删除操作

如果这个InternalKey满足一下三个条件,则可以直接丢弃。*

1.是个Deletionkey。

2.sequence <= small_snaphshot。

3.当前compact的level是level-n和level-n+1,

如果在level-n+1以上的层已经没有此InternalKey对应的user_key了。

基于以上三种情况可删除。 为什么要此条件(IsBaseLevelForKey)判断呢?

举个例子: 如果在更高层,还有此InternalKey对应的User_key,此时你把当前这个InternalKey删除了,那就会出现两个问题:

问题1:再次读取删除的key时,就会读取到老的过期的key(这个key的type是非deletion),这是有问题的。

问题2:再次合并时,但这个key(这个key的type是非deletion)首次被读取时last_sequence_for_key会设置为kMaxSequenceNumber,这样就也不会丢弃。

以上两个问题好像在更高层的也就是旧的此key的所有userkey的type都是是delete的时候好像是没问题的,但这毕竟是少数,原则上为了系统正常运行,我们每次丢弃一个标记为kTypeDeletion的key时,必须保证数据库中不存在它的过期key,否则就得将它保留,直到后面它和这个过期的key合并为止,合并之后再丢弃。

4、对于生成的SSTable文件,设置两个上限,哪个先达到,都会开始新的SSTable。一个就是2MB,另外一个就是判断上一Level和这个文件的重叠的文件数量,不超过10个,这是为了控制这个生成的文件Compaction的时候,不会和太多的上层文件重叠。

最后通过DBImpl::InstallCompactionResults安装Compaction的结果:

3.5 InstallCompactionResults()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));

// Add compaction outputs
// 将该删除的文件和该添加的文件更新到VersionEdit里
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); //应用一次版本变更,安装新版本
}