structFileMetaData { int refs; int allowed_seeks; // Seeks allowed until compaction uint64_t number; //用来唯一表示一个sstable,如文件命名的编号 uint64_t file_size; // File size in bytes 文件的大小 InternalKey smallest; // Smallest internal key served by table 最小键 InternalKey largest; // Largest internal key served by table 最大键
// Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file // 记录{level, FileMetaData}对到new_files_ voidAddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, const InternalKey& largest){ FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; new_files_.push_back(std::make_pair(level, f)); }
// Delete the specified "file" from the specified "level". voidDeleteFile(int level, uint64_t file){ deleted_files_.insert(std::make_pair(level, file)); }
classLevelFileNumIterator; Iterator* NewConcatenatingIterator(const ReadOptions&, int level)const;
// Call func(arg, level, f) for every file that overlaps user_key in // order from newest to oldest. If an invocation of func returns // false, makes no more calls. // // REQUIRES: user portion of internal_key == user_key. voidForEachOverlapping(Slice user_key, Slice internal_key, void* arg, bool (*func)(void*, int, FileMetaData*));
VersionSet* vset_; // VersionSet to which this Version belongs所有的version都属于一个集合即Version Set Version* next_; // Next version in linked list 有next_和prev_表示version之间组成一个双链表 Version* prev_; // Previous version in linked list int refs_; // Number of live refs to this version
// List of files per level std::vector<FileMetaData*> files_[config::kNumLevels]; //每层的文件列表,每个vector中的元素类型是FileMetaData
//Compaction触发条件有两种:file_to_compact_ != NULL or compaction_score_ > 1.0 // Next file to compact based on seek stats. // 下次compaction的file及level,基于allowed_seeks计算 FileMetaData* file_to_compact_; int file_to_compact_level_;
// Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). // 下次compaction的score及level,如果score < 1,表明没有必要compact // 在Finalize里计算,基于文件大小or个数 double compaction_score_; int compaction_level_;
// Apply *edit to the current version to form a new descriptor that // is both saved to persistent state and installed as the new // current version. Will release *mu while actually writing to the file. // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() Status LogAndApply(VersionEdit* edit, port::Mutex* mu) EXCLUSIVE_LOCKS_REQUIRED(mu);
// Recover the last saved descriptor from persistent storage. Status Recover(bool *save_manifest);
// Return the current version. Version* current()const{ return current_; }
// Return the current manifest file number uint64_tManifestFileNumber()const{ return manifest_file_number_; }
// Allocate and return a new file number uint64_tNewFileNumber(){ return next_file_number_++; }
// Arrange to reuse "file_number" unless a newer file number has // already been allocated. // REQUIRES: "file_number" was returned by a call to NewFileNumber(). voidReuseFileNumber(uint64_t file_number){ if (next_file_number_ == file_number + 1) { next_file_number_ = file_number; } }
// Return the number of Table files at the specified level. intNumLevelFiles(int level)const; //const修饰成员函数表明不能改变类的成员变量
// Return the combined file size of all files at the specified level. int64_tNumLevelBytes(int level)const;
// Return the last sequence number. uint64_tLastSequence()const{ return last_sequence_; }
// Set the last sequence number to s. voidSetLastSequence(uint64_t s){ assert(s >= last_sequence_); last_sequence_ = s; }
// Mark the specified file number as used. voidMarkFileNumberUsed(uint64_t number);
// Return the current log file number. uint64_tLogNumber()const{ return log_number_; }
// Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_tPrevLogNumber()const{ return prev_log_number_; }
// Pick level and inputs for a new compaction. // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. Compaction* PickCompaction();
// Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that // level that overlaps the specified range. Caller should delete // the result. Compaction* CompactRange( int level, const InternalKey* begin, const InternalKey* end);
// Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. int64_tMaxNextLevelOverlappingBytes();
// Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. Iterator* MakeInputIterator(Compaction* c);
// Returns true iff some level needs a compaction. boolNeedsCompaction()const{ Version* v = current_; return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr); }
// Add all files listed in any live version to *live. // May also mutate some internal state. voidAddLiveFiles(std::set<uint64_t>* live);
// Return the approximate offset in the database of the data for // "key" as of version "v". uint64_tApproximateOffsetOf(Version* v, const InternalKey& key);
// Return a human-readable short (single-line) summary of the number // of files per level. Uses *scratch as backing store. structLevelSummaryStorage { char buffer[100]; }; constchar* LevelSummary(LevelSummaryStorage* scratch)const;
// Save current contents to *log Status WriteSnapshot(log::Writer* log);
voidAppendVersion(Version* v);
Env* const env_; conststd::string dbname_; const Options* const options_; TableCache* const table_cache_; //cache const InternalKeyComparator icmp_; uint64_t next_file_number_; // 文件编号 uint64_t manifest_file_number_; //当前manifest文件 uint64_t last_sequence_; //该序列号表示internal key中的sequence number uint64_t log_number_; //log文件序号 uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_; Version dummy_versions_; // Head of circular doubly-linked list of versions. 链表头 Version* current_; // == dummy_versions_.prev_ 当前版本
// Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. std::string compact_pointer_[config::kNumLevels];
// No copying allowed VersionSet(const VersionSet&); voidoperator=(const VersionSet&); };
// Delete the specified "file" from the specified "level". void RemoveFile(int level, uint64_t file) { deleted_files_.insert(std::make_pair(level, file)); }
AddInputDeletions函数:
1 2 3 4 5 6 7
voidCompaction::AddInputDeletions(VersionEdit* edit){ for (int which = 0; which < 2; which++) { for (size_t i = 0; i < inputs_[which].size(); i++) { edit->RemoveFile(level_ + which, inputs_[which][i]->number); } } }
voidDBImpl::BackgroundCompaction(){ ······ Status status; if (c == nullptr) { //如果c为空,说明没有文件需要进行compaction,无事可做了 // Nothing to do } elseif (!is_manual && c->IsTrivialMove()) { // Move file to next level //如果不是主动触发的,并且level中的输入文件与level+1中无重叠,且与level + 2中重叠不大于 //kMaxGrandParentOverlapBytes = 10 * kTargetFileSize,直接将文件移到level+1中 assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); //写入version中,稍后分析 if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsignedlonglong>(f->number), c->level() + 1, static_cast<unsignedlonglong>(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); } else { CompactionState* compact = new CompactionState(c); //c中包含需要compaction的文件的元信息 status = DoCompactionWork(compact); //否则调用DoCompactionWork进行compact输出文件 if (!status.ok()) { RecordBackgroundError(status); } CleanupCompaction(compact); //清理compaction过程中的临时变量 c->ReleaseInputs(); //清楚输入文件描述符 RemoveObsoleteFiles(); //删除无引用的文件 } delete c; }
再来看==new_files_==
1 2 3 4 5 6 7 8 9 10 11 12
// Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file voidAddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, const InternalKey& largest){ FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; new_files_.push_back(std::make_pair(level, f)); }
//当前new的version应用edit Version* v = new Version(this); { Builder builder(this, current_); builder.Apply(edit); builder.SaveTo(v); } Finalize(v); //计算下一次compaction的compaction_level_和compaction_score_
// Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. std::string new_manifest_file; //创建新的manifest文件 Status s; if (descriptor_log_ == nullptr) { // No reason to unlock *mu here since we only hit this path in the // first call to LogAndApply (when opening the database). assert(descriptor_file_ == nullptr); //形如MANIFEST-xxxxxx的文件名 // new_manifest_file为当前manifest文件名 new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); //创建文件 if (s.ok()) { descriptor_log_ = newlog::Writer(descriptor_file_); // manifest写入current_的信息 s = WriteSnapshot(descriptor_log_); } }
// Unlock during expensive MANIFEST log write { mu->Unlock();
// Write new record to MANIFEST log if (s.ok()) { std::string record; edit->EncodeTo(&record); //edit的内容编码到record中 // manifest写入本次edit的信息 s = descriptor_log_->AddRecord(record); if (s.ok()) { s = descriptor_file_->Sync(); //刷都设备上 } if (!s.ok()) { Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); } }
// If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. // 将manifest_file_number_写入CURRENT文件 if (s.ok() && !new_manifest_file.empty()) { s = SetCurrentFile(env_, dbname_, manifest_file_number_); }
mu->Lock(); }
// Install the new version if (s.ok()) { AppendVersion(v); //插入version,更新current log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; } else { delete v; if (!new_manifest_file.empty()) { delete descriptor_log_; delete descriptor_file_; descriptor_log_ = nullptr; descriptor_file_ = nullptr; env_->DeleteFile(new_manifest_file); } }
return s; }
总结来看LogAndApply主要作用:
将edit应用于current_生成一个新的Version
计算新Version下,下次 major compaction 的文件
更新一些元信息管理文件
将新Version添加到VersionSet的 双向链表,current_ = 新Version
首先是生成新Version:
1 2 3 4 5 6
Version* v = new Version(this); { Builder builder(this, current_); builder.Apply(edit); builder.SaveTo(v); }
voidVersionSet::Finalize(Version* v){ // Precomputed best level for next compaction int best_level = -1; double best_score = -1;
//level 0看文件个数,降低seek的次数,提高读性能,个数/4 //level >0看文件大小,减少磁盘占用,大小/(10M**level) //例如: //level 0 有4个文件,score = 1.0 //level 1 文件大小为9M,score = 0.9 //那么compact的level就是0,score = 1.0 for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // // (1) With larger write-buffer sizes, it is nice not to do too // many level-0 compactions. // // (2) The files in level-0 are merged on every read and // therefore we wish to avoid too many files when the individual // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). score = v->files_[level].size() / static_cast<double>(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. constuint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level); }
// Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. std::string new_manifest_file; Status s; if (descriptor_log_ == nullptr) { // No reason to unlock *mu here since we only hit this path in the // first call to LogAndApply (when opening the database). assert(descriptor_file_ == nullptr); //形如MANIFEST-xxxxxx的文件名 new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); if (s.ok()) { descriptor_log_ = newlog::Writer(descriptor_file_); // manifest写入current_的信息 s = WriteSnapshot(descriptor_log_); } }
写入edit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Unlock during expensive MANIFEST log write { mu->Unlock();
// Write new record to MANIFEST log if (s.ok()) { std::string record; edit->EncodeTo(&record); // manifest写入本次edit的信息 s = descriptor_log_->AddRecord(record); if (s.ok()) { s = descriptor_file_->Sync(); } if (!s.ok()) { Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str()); } }