voidDBImpl::MaybeScheduleCompaction(){ mutex_.AssertHeld(); if (background_compaction_scheduled_) { // Already scheduled } elseif (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions } elseif (!bg_error_.ok()) { // Already got an error; no more changes } elseif (imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done // 防止无限递归,会判断需不需要再次compaction,如果不需要,递归结束 } else { background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } }
voidDBImpl::BackgroundCall(){ MutexLock l(&mutex_); assert(background_compaction_scheduled_); if (shutting_down_.load(std::memory_order_acquire)) { // No more background work when shutting down. } elseif (!bg_error_.ok()) { // No more background work after a background error. } else { BackgroundCompaction();//关键部分 BackgroundCompaction }
background_compaction_scheduled_ = false;
// Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); background_work_finished_signal_.SignalAll(); }
// Save the contents of the memtable as a new Table 将memtable的内容另存为新表 VersionEdit edit; Version* base = versions_->current(); base->Ref(); Status s = WriteLevel0Table(imm_, &edit, base); //将数据写到L0 base->Unref();
if (s.ok() && shutting_down_.load(std::memory_order_acquire)) { s = Status::IOError("Deleting DB during memtable compaction"); }
// Replace immutable memtable with the generated Table if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed s = versions_->LogAndApply(&edit, &mutex_); //早期的log不再需要,将生成的Version Edit到当前VersionSet中 }
if (s.ok()) { // Commit to the new state imm_->Unref(); //计数-1,引用计数为0时会delete当前imm_ imm_ = nullptr; has_imm_.store(false, std::memory_order_release); RemoveObsoleteFiles(); } else { RecordBackgroundError(s); } }
// Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. int level = 0; if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); if (base != nullptr) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); //选择sstable写到哪个level } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); //添加本次sstable的Filemeta 生成VersionEdit,给manifest文件记录。 }
Status BuildTable(conststd::string& dbname, Env* env, const Options& options, TableCache* table_cache, Iterator* iter, FileMetaData* meta){ Status s; meta->file_size = 0; iter->SeekToFirst();
std::string fname = TableFileName(dbname, meta->number); if (iter->Valid()) { WritableFile* file; s = env->NewWritableFile(fname, &file); if (!s.ok()) { return s; }
TableBuilder* builder = new TableBuilder(options, file); //新建sstable meta->smallest.DecodeFrom(iter->key()); //第一个key是最小的 Slice key; for (; iter->Valid(); iter->Next()) { key = iter->key(); builder->Add(key, iter->value()); //向sstable添加key } if (!key.empty()) { meta->largest.DecodeFrom(key); //最后一个key是最大的 }
// Finish and check for builder errors s = builder->Finish(); //写入sstable的其他block,filter block、metaindex block、index block if (s.ok()) { meta->file_size = builder->FileSize(); assert(meta->file_size > 0); } delete builder;
// Finish and check for file errors if (s.ok()) { s = file->Sync(); //写入 } if (s.ok()) { s = file->Close(); } delete file; file = nullptr;
if (s.ok()) { // Verify that the table is usable Iterator* it = table_cache->NewIterator(ReadOptions(), meta->number, meta->file_size); s = it->status(); delete it; } }
// Check for input iterator errors if (!iter->status().ok()) { s = iter->status(); }
if (s.ok() && meta->file_size > 0) { // Keep it } else { env->RemoveFile(fname); } return s; }
// Maximum level to which a new compacted memtable is pushed if it // does not create overlap. We try to push to level 2 to avoid the // relatively expensive level 0=>1 compactions and to avoid some // expensive manifest file operations. We do not push all the way to // the largest level since that can generate a lot of wasted disk // space if the same key space is being repeatedly overwritten. staticconstint kMaxMemCompactLevel = 2;
intVersion::PickLevelForMemTableOutput(const Slice& smallest_user_key, const Slice& largest_user_key){ int level = 0; if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) { //如果sstable文件与L0有重叠,直接加入L0 // Push to next level if there is no overlap in next level, // and the #bytes overlapping in the level after that are limited. InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek); InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0)); std::vector<FileMetaData*> overlaps; while (level < config::kMaxMemCompactLevel) {//最高判断到L2 while 循环寻找合适的level层级,最大level为2,不能更大 if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { break; } if (level + 2 < config::kNumLevels) { //kNumLevels=7 // Check that file does not overlap too many grandparent bytes. GetOverlappingInputs(level + 2, &start, &limit, &overlaps); constint64_t sum = TotalFileSize(overlaps); if (sum > MaxGrandParentOverlapBytes(vset_->options_)) { break; } } level++; } } return level; }
// Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file voidAddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, const InternalKey& largest){ FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; new_files_.push_back(std::make_pair(level, f)); }
voidDBImpl::MaybeScheduleCompaction(){ mutex_.AssertHeld(); if (background_compaction_scheduled_) { // Already scheduled } elseif (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions } elseif (!bg_error_.ok()) { // Already got an error; no more changes } elseif (imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done // 防止无限递归,会判断需不需要再次compaction,如果不需要,递归结束 } else { background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } }
voidVersionSet::Finalize(Version* v){ // Precomputed best level for next compaction int best_level = -1; double best_score = -1;
for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // // (1) With larger write-buffer sizes, it is nice not to do too // many level-0 compactions. // // (2) The files in level-0 are merged on every read and // therefore we wish to avoid too many files when the individual // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). score = v->files_[level].size() / static_cast<double>(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. constuint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level); }
staticconstint kL0_CompactionTrigger = 4;// Level-0 compaction is started when we hit this many files.
if (level == 0) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // // (1) With larger write-buffer sizes, it is nice not to do too // many level-0 compactions. // // (2) The files in level-0 are merged on every read and // therefore we wish to avoid too many files when the individual // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). score = v->files_[level].size() / static_cast<double>(config::kL0_CompactionTrigger); }
else { // Compute the ratio of current size to size limit. constuint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level); }
1 2 3 4 5 6 7 8 9 10 11 12
staticdoubleMaxBytesForLevel(const Options* options, int level){ // Note: the result for level zero is not really used since we set // the level-0 compaction threshold based on number of files.
// Result for both level-0 and level-1 double result = 10. * 1048576.0; //默认是10M while (level > 1) { result *= 10; level--; } return result; }
Compaction* VersionSet::PickCompaction(){ Compaction* c; int level;
// We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. constbool size_compaction = (current_->compaction_score_ >= 1); constbool seek_compaction = (current_->file_to_compact_ != nullptr); if (size_compaction) { /*size compaction是第一种情况,根据size来决定是否发起compaction,从which层级发起compaction*/ level = current_->compaction_level_; assert(level >= 0); assert(level + 1 < config::kNumLevels); c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level] for (size_t i = 0; i < current_->files_[level].size(); i++) { FileMetaData* f = current_->files_[level][i]; if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { c->inputs_[0].push_back(f); break; } } if (c->inputs_[0].empty()) { // Wrap-around to the beginning of the key space c->inputs_[0].push_back(current_->files_[level][0]); } } elseif (seek_compaction) { // /*seek_compaction 是第二种情况,无效seek 次数太多,所以依据文件以及其所属层级发起compaction*/ level = current_->file_to_compact_level_; c = new Compaction(options_, level); c->inputs_[0].push_back(current_->file_to_compact_); } else { returnnullptr; }
// Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { InternalKey smallest, largest; GetRange(c->inputs_[0], &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); assert(!c->inputs_[0].empty()); }
// Get entire range covered by compaction InternalKey all_start, all_limit; //inputs_[0, 1]两层所有文件的key range -> [all_start, all_limit] GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
根据inputs_[1]反推下 level 层有多少 key range 有重叠的文件,记录到expanded0:
1 2 3 4 5 6 7 8
// See if we can grow the number of inputs in"level" without // changing the number of "level+1" files we pick up. // 如果再不增加level + 1层文件的情况下,尽可能的增加level层的文件 if (!c->inputs_[1].empty()) { std::vector<FileMetaData*> expanded0; //level层与[all_start, all_limit]有overlap的所有文件,记录到expanded0 //expanded0 >= inputs_[0] current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
// Update the place where we will do the next compaction for this level. // We update this immediately instead of waiting for the VersionEdit // to be applied so that if the compaction fails, we will try a different // key range next time. compact_pointer_[level] = largest.Encode().ToString(); c->edit_.SetCompactPointer(level, largest); //记录下Compaction_pointer_,对于size compaction, 下一次要靠该值来选择 level n的参与compaction的文件 }
else { if (!has_current_user_key || //如果第一次碰到一个user key user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 0) { // First occurrence of this user key 第一次出现的key,将seq设置为最大标记 current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; } //因为第一次出现会将last seq设置为最大, 表示上一个key的关于seqnumber的比较结果 // 如果上一个Key的SequenceNumber <= 最小的存活的Snapshot,那么 // 这个Key的SequenceNumber一定 < 最小的存活的Snapshot,那么这个Key就不 // 会被任何线程看到了,可以被丢弃,上面碰到了第一个User Key时,设置了 // last_sequence_for_key = kMaxSequenceNumber;保证第一个Key一定不会 // 被丢弃。 if (last_sequence_for_key <= compact->smallest_snapshot) { // Hidden by an newer entry for same user key drop = true; // (A) } elseif (ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // 如果碰到了一个删除操作,并且SequenceNumber <= 最小的Snapshot, // 通过IsBaseLevelForKey判断更高的Level不会有这个User Key存在,那么这个Key就被丢弃 // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers // (3) data in layers that are being compacted here and have // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; }
if (!drop) { // Open output file if necessary 打开输出文件 if (compact->builder == nullptr) { status = OpenCompactionOutputFile(compact); if (!status.ok()) { break; } } if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value()); //没有被丢弃就添加key
// Close output file if it is big enough // 达到文件大小,就写入文件,生成新文件 if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) { //当前结果超过输出阈值2M status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } } } input->Next(); //遍历下一个KV对