leveldb学习记录-Version

本篇博客主要讲解阅读leveldb源码的version版本管理相关内容,个人的一些笔记。

1.version

==为什么要有版本管理?==

对于同一笔记录,如果读和写同一时间发生,reader可能读到不一致的数据或者是修改了一半的数据。对于这种情况,有三种常见的解决方法:

悲观锁 最简单的处理方式,就是加锁保护,写的时候不许读,读的时候不许写。效率低。

乐观锁 它假设多用户并发的事物在处理时不会彼此互相影响,各食物能够在不产生锁的的情况下处理各自影响的那部分数据。 在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。 如果其他事务有更新的话,正在提交的事务会进行回滚;这样做不会有锁竞争更不会产生死锁, 但如果数据竞争的概率较高,效率也会受影响 。

*MVCC * MVCC是一个数据库常用的概念。Multiversion concurrency control多版本并发控制。每一个执行操作的用户,看到的都是数据库特定时刻的的快照(snapshot), writer的任何未完成的修改都不会被其他的用户所看到; 当对数据进行更新的时候并是不直接覆盖,而是先进行标记, 然后在其他地方添加新的数据,从而形成一个新版本, 此时再来读取的reader看到的就是最新的版本了。所以这种处理策略是维护了多个版本的数据的,但只有一个是最新的。

sstable的多版本并发控制就是利用version来实现的。

  • 只有一个current version,持有最新的sstable集合。
  • VersionEdit 代表一次更新,新增了哪些sstable file,以及删除了哪些sstable file

compaction操作简单来说就是新增与删除文件的过程,对于minor compaction是将不可变的memtable文件dump到磁盘形成sstable;对于major compaction则是归并排序N个文件到M个新文件,如何管理这些文件,就是通过version,能够识别出哪些是归并排序后需要写回下一level的sstable files,哪些属于历史文件。compaction操作后,清理后的数据放到新的版本里面,而旧的数据最终是要被清理掉的,但是如果有某个sstable文件正要被读取,暂时不能删除,该文件属于之前的某个version。

==版本管理负责管理磁盘上的文件,保证leveldb各层数据的准确性。==

2.VersionEdit的结构

前面提到过sstable的MVCC多版本并发控制是利用version来实现的,那么如何从一个版本过渡到新版本的呢?leveldb中是通过VersionEdit来实现的

VersionEdit顾名思义,是编辑或修改Version。它记录的是两个Version之间的差异

Versoin0 + VersoinEdit = Version1

每次compaction都是新增与删除文件,在原文件版本的基础上生成一个新的版本,也就是Version + Delta = New-Version

在 leveldb 具体实现中,负责管理 Delta 的类是 VersionEdit,某个版本使用Version记录。

Version

operator + =则由类Builder实现。


2.1 源码解析-VersionEdit

首先来看看VersionEdit的成员变量和成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private:
friend class VersionSet;

typedef std::set< std::pair<int, uint64_t> > DeletedFileSet;

std::string comparator_;
uint64_t log_number_;
uint64_t prev_log_number_;
uint64_t next_file_number_;
SequenceNumber last_sequence_;
bool has_comparator_;
bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_;
bool has_last_sequence_;

std::vector< std::pair<int, InternalKey> > compact_pointers_;
DeletedFileSet deleted_files_;//待删除文件
//新增文件,例如immutable memtable dump后就会添加到new_files_
std::vector< std::pair<int, FileMetaData> > new_files_;//本次操作新增的文件

new_files_是新的version新增了哪些文件,

deleted_files_是新的version删除了哪些文件。

VersionEdit里面保存了此次compact新生成的sstable所处level和MetaData
同时保存了需要被删除的sstable(即被compact的sstable)所处level和filenumber.

从一个版本向另一个版本的过度,是由compaction引起的。

为深入理解version,有必要清楚FileMetaData这个数据结构

1
2
3
4
5
6
7
8
9
10
struct FileMetaData {
int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number; //用来唯一表示一个sstable,如文件命名的编号
uint64_t file_size; // File size in bytes 文件的大小
InternalKey smallest; // Smallest internal key served by table 最小键
InternalKey largest; // Largest internal key served by table 最大键

FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } //参数初始化
};

由上段代码可以得知在sstable中存放的key是InternalKey(user_key+sequencenumber+type)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
// 记录{level, FileMetaData}对到new_files_
void AddFile(int level, uint64_t file,
uint64_t file_size,
const InternalKey& smallest,
const InternalKey& largest) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}

// Delete the specified "file" from the specified "level".
void DeleteFile(int level, uint64_t file) {
deleted_files_.insert(std::make_pair(level, file));
}

成员函数AddFile是记录{level, FileMetaData}对到new_files_(一个vector)中。

DeleteFile在特定的level中删除特定的file

version类的定义

先看看Version的成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private:
friend class Compaction;
friend class VersionSet;

class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;

// Call func(arg, level, f) for every file that overlaps user_key in
// order from newest to oldest. If an invocation of func returns
// false, makes no more calls.
//
// REQUIRES: user portion of internal_key == user_key.
void ForEachOverlapping(Slice user_key, Slice internal_key,
void* arg,
bool (*func)(void*, int, FileMetaData*));

VersionSet* vset_; // VersionSet to which this Version belongs所有的version都属于一个集合即Version Set
Version* next_; // Next version in linked list 有next_和prev_表示version之间组成一个双链表
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version

// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels]; //每层的文件列表,每个vector中的元素类型是FileMetaData

//Compaction触发条件有两种:file_to_compact_ != NULL or compaction_score_ > 1.0
// Next file to compact based on seek stats.
// 下次compaction的file及level,基于allowed_seeks计算
FileMetaData* file_to_compact_;
int file_to_compact_level_;

// Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize().
// 下次compaction的score及level,如果score < 1,表明没有必要compact
// 在Finalize里计算,基于文件大小or个数
double compaction_score_;
int compaction_level_;

explicit Version(VersionSet* vset)
: vset_(vset), next_(this), prev_(this), refs_(0),
file_to_compact_(nullptr),
file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {
}

~Version();

// No copying allowed
Version(const Version&);
void operator=(const Version&);

std::vector<FileMetaData*> files_[config::kNumLevels]; kNumLevels=7表明从L0-L6。

这个vector数组保存了该版本下的所有sstable文件元数据。

VersionSet类的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class VersionSet {
public:
VersionSet(const std::string& dbname,
const Options* options,
TableCache* table_cache,
const InternalKeyComparator*);
~VersionSet();

// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);

// Recover the last saved descriptor from persistent storage.
Status Recover(bool *save_manifest);

// Return the current version.
Version* current() const { return current_; }

// Return the current manifest file number
uint64_t ManifestFileNumber() const { return manifest_file_number_; }

// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_++; }

// Arrange to reuse "file_number" unless a newer file number has
// already been allocated.
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
void ReuseFileNumber(uint64_t file_number) {
if (next_file_number_ == file_number + 1) {
next_file_number_ = file_number;
}
}

// Return the number of Table files at the specified level.
int NumLevelFiles(int level) const; //const修饰成员函数表明不能改变类的成员变量

// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;

// Return the last sequence number.
uint64_t LastSequence() const { return last_sequence_; }

// Set the last sequence number to s.
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
last_sequence_ = s;
}

// Mark the specified file number as used.
void MarkFileNumberUsed(uint64_t number);

// Return the current log file number.
uint64_t LogNumber() const { return log_number_; }

// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }

// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction* PickCompaction();

// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
Compaction* CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end);

// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t MaxNextLevelOverlappingBytes();

// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c);

// Returns true iff some level needs a compaction.
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}

// Add all files listed in any live version to *live.
// May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live);

// Return the approximate offset in the database of the data for
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);

// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
struct LevelSummaryStorage {
char buffer[100];
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;

private:
class Builder;

friend class Compaction;
friend class Version;

bool ReuseManifest(const std::string& dscname, const std::string& dscbase);

void Finalize(Version* v);

void GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest);

void GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest);

void SetupOtherInputs(Compaction* c);

// Save current contents to *log
Status WriteSnapshot(log::Writer* log);

void AppendVersion(Version* v);

Env* const env_;
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_; //cache
const InternalKeyComparator icmp_;
uint64_t next_file_number_; // 文件编号
uint64_t manifest_file_number_; //当前manifest文件
uint64_t last_sequence_; //该序列号表示internal key中的sequence number
uint64_t log_number_; //log文件序号
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted

// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;
Version dummy_versions_; // Head of circular doubly-linked list of versions. 链表头
Version* current_; // == dummy_versions_.prev_ 当前版本

// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];

// No copying allowed
VersionSet(const VersionSet&);
void operator=(const VersionSet&);
};

LevelDB将所有的Version置于一个双向链表之中,即位于一个集合之中。这样所有的Version组成一个名为VersionSet的结构。

下图是涉及到的Version、VersionEdit等结构图

img

一个Version结构中保存了L0-L6各个level中sstable文件的元数据,VersionSet里维护了一个双向的环装的Version链表。

LevelDB会触发Compaction,会对一些文件进行清理操作,让数据更加有序,清理后的数据放到新的版本里面,而老的数据作为原始的素材,最终是要清理掉的,但是如果有读事务位于旧的文件,那么暂时就不能删除。因此利用引用计数,只要一个Verison还活着,就不允许删除该Verison管理的所有文件。当一个Version生命周期结束,它管理的所有文件的引用计数减1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Version::~Version() {
assert(refs_ == 0);

// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;

// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}

Version的析构函数。当一个Version被删除时,它管理的所有file的引用计数都会减1,当引用计数<=0时,删除该file,是FileMetaData类型。


前面提到Version + VersionEdit = New-Version具体的操作则由类Builder实现。

image-20201224151238252

通过sourcetrail源码阅读工具,查看函数调用关系。可以看到在LogAndApply和Recover中调用了Bulider。

VersionEdit通过new_files_和 deleted_files_保存新增和要被删除的sstable 所在的level和filenumber

1
2
3
std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;

在RemoveFile函数中调用了deleted_files_

1
2
3
4
// Delete the specified "file" from the specified "level".
void RemoveFile(int level, uint64_t file) {
deleted_files_.insert(std::make_pair(level, file));
}

image-20201224154552419

AddInputDeletions函数:

1
2
3
4
5
6
7
void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) {
edit->RemoveFile(level_ + which, inputs_[which][i]->number);
}
}
}

BackgroundCompaction函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void DBImpl::BackgroundCompaction() { 
······
Status status;
if (c == nullptr) { //如果c为空,说明没有文件需要进行compaction,无事可做了
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
//如果不是主动触发的,并且level中的输入文件与level+1中无重叠,且与level + 2中重叠不大于
//kMaxGrandParentOverlapBytes = 10 * kTargetFileSize,直接将文件移到level+1中
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_); //写入version中,稍后分析
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c); //c中包含需要compaction的文件的元信息
status = DoCompactionWork(compact); //否则调用DoCompactionWork进行compact输出文件
if (!status.ok()) {
RecordBackgroundError(status);
}
CleanupCompaction(compact); //清理compaction过程中的临时变量
c->ReleaseInputs(); //清楚输入文件描述符
RemoveObsoleteFiles(); //删除无引用的文件
}
delete c;
}

再来看==new_files_==

1
2
3
4
5
6
7
8
9
10
11
12
// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}

new_files_在AddFile函数中push_back一个pair(pair.first是层级level,pair.second是FileMetaData文件元数据)。

AddFile函数被WriteLevel0TableBackgroundCompactionInstallCompactionResults函数调用。

WriteLevel0Table 中是 memtable dump 到 level0 所生成的新文件。

BackgroundCompaction 是上层与下层不重叠的情况,紧接着是 InstallCompactionResults:将compaction的结果写到level+1中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld();
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes));

// Add compaction outputs
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
//新生成的文件增加到edit ,level+1
compact->compaction->edit()->AddFile(
level + 1,
out.number, out.file_size, out.smallest, out.largest);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}

image-20201224160650586

LogAndApply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
//为该edit绑定log_number
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}

if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}

edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);

//当前new的version应用edit
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v); //计算下一次compaction的compaction_level_和compaction_score_

// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file; //创建新的manifest文件
Status s;
if (descriptor_log_ == nullptr) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
//形如MANIFEST-xxxxxx的文件名 // new_manifest_file为当前manifest文件名
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); //创建文件
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
// manifest写入current_的信息
s = WriteSnapshot(descriptor_log_);
}
}

// Unlock during expensive MANIFEST log write
{
mu->Unlock();

// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record); //edit的内容编码到record中
// manifest写入本次edit的信息
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync(); //刷都设备上
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}

// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
// 将manifest_file_number_写入CURRENT文件
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

mu->Lock();
}

// Install the new version
if (s.ok()) {
AppendVersion(v); //插入version,更新current
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->DeleteFile(new_manifest_file);
}
}

return s;
}

总结来看LogAndApply主要作用:

  1. edit应用于current_生成一个新的Version
  2. 计算新Version下,下次 major compaction 的文件
  3. 更新一些元信息管理文件
  4. 将新Version添加到VersionSet的 双向链表,current_ = 新Version

首先是生成新Version:

1
2
3
4
5
6
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}

接着调用Finalize计算下次 major compact 时要处理的层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;

//level 0看文件个数,降低seek的次数,提高读性能,个数/4
//level >0看文件大小,减少磁盘占用,大小/(10M**level)
//例如:
//level 0 有4个文件,score = 1.0
//level 1 文件大小为9M,score = 0.9
//那么compact的level就是0,score = 1.0
for (int level = 0; level < config::kNumLevels-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
// instead of number of bytes for two reasons:
//
// (1) With larger write-buffer sizes, it is nice not to do too
// many level-0 compactions.
//
// (2) The files in level-0 are merged on every read and
// therefore we wish to avoid too many files when the individual
// file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}

if (score > best_score) {
best_level = level;
best_score = score;
}
}

v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}

更新manifest写入current_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == nullptr) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
//形如MANIFEST-xxxxxx的文件名
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
// manifest写入current_的信息
s = WriteSnapshot(descriptor_log_);
}
}

写入edit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Unlock during expensive MANIFEST log write
{
mu->Unlock();

// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
// manifest写入本次edit的信息
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}

manifest就更新完成了,注意格式跟log相同。

接着在CURRENT文件里明文写入manifest文件名。

1
2
3
4
// 将manifest_file_number_写入CURRENT文件
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

这两个文件在 leveldb 数据库文件里都能找到,形如MANIFEST-000004 CURRENT.

最后就是调用AppendVersion(v);将新版本更新到链表,修改current_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// v加到链表里
void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != nullptr) {
current_->Unref();
}
current_ = v;
v->Ref();

// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}

至此,就完成了将edit的全部过程。

在磁盘中对应的操作如下图所示:

manifest

总结