leveldb学习记录-互斥锁、原子量

leveldb使用的仍然是C++标准库中的互斥量和条件变量,做了简单的封装port/port_stdcxx.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class DBImpl {
// Queue of writers.
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
...
}

// Information kept for every waiting writer
struct DBImpl::Writer {
explicit Writer(port::Mutex* mu)
: batch(nullptr), sync(false), done(false), cv(mu) {}

Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}

// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);

// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();

versions_->SetLastSequence(last_sequence);
}

while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}

// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

return status;
}

// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-null batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
mutex_.AssertHeld();
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != nullptr);

size_t size = WriteBatchInternal::ByteSize(first->batch);

// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128 << 10)) {
max_size = size + (128 << 10);
}

*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}

// Append to *result
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}

假设现在有编号为 [1, 2, 3, 4] 的四个线程基本同时调用写入操作,发生的事件如下:

  1. 每个线程中各自构造 Writer
  2. 1 号线程较快地构造了 MutexLock 拿到锁,[2, 3, 4] 则阻塞在此处;
  3. 1 号线程将写入请求插入双向队列中,跳过循环,继续向下走;
  4. 1 号线程执行 BuildBatchGroup,由于队列中只有自身一个请求,不会发生合并;
  5. 1 号线程执行 mutex_.Unlock() 释放锁,随后执行写入操作,完成后执行 mutex_.Lock() 再次获得锁;
  6. 1 号线程在循环中从双向队列里将写入请求弹出,最后通知队列顶的 2 号线程唤醒;
  7. 1号线程析构局部变量、释放锁。

在第 5 步发生释放锁的同时:

  1. 2 号线程获得锁,将写入请求插入双向队列中,由于请求不在队列顶端,进而进入循环、等待、释放锁;
  2. 3 号线程获得锁,将写入请求插入双向队列中,由于请求不在队列顶端,进而进入循环、等待、释放锁;
  3. 此时 1 号线程写入完成、执行 mutex_.Lock() 获得锁,4 号线程继续等待;
  4. 1 号线程执行结束、释放锁,2 号线程唤醒获得锁,执行 BuildBatchGroup 将队列中的 3 号线程中的写入请求合并;
  5. 2 号线程执行 mutex_.Unlock() 解锁,随后执行写入操作,完成后执行 mutex_.Lock() 再次获得锁。与此同时 4 号线程获得锁,将写入请求插入双向队列中,等待、释放锁;
  6. 2 号线程在循环中从双向队列里将写入请求弹出,将 3 号线程的写入请求标记为完成,尝试唤醒 3 号线程;
  7. 2 号线程析构局部变量、释放锁。3 号线程唤醒、获得锁,判断已完成,返回、释放锁;
  8. 4 号线程唤醒、获得锁,正常执行。

上述合并操作依赖写入时的释放锁操作,这使得其他线程有机会加入队列、然后等待,在下一次获得锁时合并队列中的其他写入请求。

LevelDB是一个多线程系统,会有后台线程负责sstable的compact,同时用户也可能采用多线程进行并发访问。因此需要一些机制来协调线程工作,控制memtable等共享资源的并发读写,主要通过如下三种机制来实现。在这里我们主要关注port/port_posix.h里的对应实现。

1
2
3
4
5
port::Mutex mutex_;

port::AtomicPointer shutting_down_;

port::CondVar bg_cv_; // Signalled when background work finishes

在leveldb中,通过调用pthread_mutex_init初始化互斥锁,mutexattr用于指定互斥锁属性,采用默认属性。主要有以下两种用法:一种是单纯的作为Mutex,另一个用法是封装在MutexLock中,在MutexLock的构造函数和析构函数中进行加锁和解锁。

主要有如下地方使用到了互斥锁:db/db_impl.cc 中DBImpl 有成员变量port::Mutex mutex_;此外在db/db_impl.cc,db/db_bench.cc,util/cache.cc,helpers/memenv/memenv.cc中还都使用了MutexLock。

在很多过程中都会用到互斥锁,比如:

l DBImpl析构中等待后台compaction结束的过程

l DBImpl::WriteLevel0Table中将memtable内容写入sstable时

l DBImpl::CompactMemTable中versions_->LogAndApply时

l DBImpl::CompactRange中访问versions_->current()时

l DBImpl::BackgroundCall()中

l DBImpl::OpenCompactionOutputFile中创建output file时

l DBImpl::NewInternalIterator

l DBImpl::Get

l DBImpl::GetSnapshot

l DBImpl::ReleaseSnapshot

l DBImpl::Write

l DBImpl::GetProperty

l DBImpl::GetApproximateSizes

l DB::Open

基本上所有的DB相关函数都使用到了该Mutex变量。因此理解这些函数中何时为何加锁解锁以及不加会有什么问题,对于理解函数实现是至关重要的。此外该Mutex变量还会被传入VersionSet::LogAndApply中,传入的目的主要是为了优化锁,在写入MANIFEST log时会进行解锁。

条件变量的使用

与互斥量不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使”条件成立”(给出条件成立信号)。

条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。

利用pthread_cond_t和Mutex实现CondVar。在LevelDB中有如下地方使用了CondVar:

port::CondVar bg_cv_;用于等待后台compaction线程的结束

port::CondVar cv;用于等待Writer完成

原子指针

原子变量即操作变量的操作是原子的,该操作不可再分,因此是线程安全的。使用原子变量的原因是多个线程对单个变量操作也会引起问题。原子变量只是保证单个变量在某一个操作过程的原子性,但是无法保证整个程序的安全性。当共享资源是位或整型变量,是一个完整的加锁体制对于一个简单的整数值看来过分了. 对于这样的情况,使用原子变量效率更高。

在LevelDB中使用的是AtomicPointer,用来提供对指针变量的原子性访问。有两种实现方式,一种是采用cstdatomic,一种是采用 MemoryBarrier。基于MemoryBarrier的实现要比cstdatomic快很多,关于MemoryBarrier的作用可参考 Why Memory Barrierbarrier内存屏蔽

可以看到MemoryBarrier()实际上就是如下的一条汇编语句:

asm volatile(“” : : : “memory”);

1)asm用于指示编译器在此插入汇编语句

2)volatile用于告诉编译器,严禁将此处的汇编语句与其它的语句重组合优化。即:原原本本按原来的样子处理这这里的汇编。

3)memory强制gcc编译器假设RAM所有内存单元均被汇编指令修改,这样cpu中的registers和cache中已缓存的内存单元中的数据将作废。cpu将不得不在需要的时候重新读取内存中的数据。这就阻止了cpu又将registers,cache中的数据用于去优化指令,而避免去访问内存。

4)””:::表示这是个空指令。

如下地方使用到了AtomicPointer:

db/skiplist.h

port::AtomicPointer next_[1];

port::AtomicPointer max_height_;

db/db_impl.h

port::AtomicPointer shutting_down_;

port::AtomicPointer has_imm_;


互斥,可以保护数据,避免资源竞争,那么互斥为什么会带来死锁呢?

常见两种典型的死锁情形:

1、线程自己将自己锁住,如果一个线程先后两次调用lock,由于第二次调用时,锁已被占用,该线程会挂起等待占用该锁的线程释放锁,而锁正是被自己占用的,该线程又被挂起没办法释放锁,就一直处于挂起等待状态了,形成了死锁。

2、多线程抢占锁资源造成死锁,如线程A获得了锁1,线程B获得了锁2,此时线程A试图调用lock获得锁2,需要挂起等待锁2被释放(也就是线程B释放锁2);此时线程B试图调用lock获得锁1,需要挂起等待锁1被释放(也就是线程A释放锁1)。这样,线程A和线程B都处于挂起状态了,形成死锁。