线程安全的多线程异步日志
需要一个队列来讲日志前端的数据传送到后端(日志线程),但这个队列不必是现成的 BlockingQueue<std::string>
,因为不用每次产生一条日志消息都通知接收方。
muduo 日志库采用的是双缓冲(double buffering)技术。
基本思路:
- 准备两块 buffer;
- 前端负责往 buffer A 填数据(日志消息);
- 后端负责将 buffer B 的数据写入文件;
- 当 buffer A 写满后,交换 A 和 B;
- 后端将 buffer A 的数据写入文件;
- 而前端则往 buffer B 填入心得日志消息;
- 循环往复……
实现
代码位置:muduo/base/AsyncLogging.cc
前端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| void AsyncLogging::append(const char* logline, int len) { muduo::MutexLockGuard lock(mutex_); if (currentBuffer_->avail() > len) { currentBuffer_->append(logline, len); } else { buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_) { currentBuffer_ = std::move(nextBuffer_); } else { currentBuffer_.reset(new Buffer); } currentBuffer_->append(logline, len); cond_.notify(); } }
|
后端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| void AsyncLogging::threadFunc() { assert(running_ == true); latch_.countDown(); LogFile output(basename_, rollSize_, false); BufferPtr newBuffer1(new Buffer); BufferPtr newBuffer2(new Buffer); newBuffer1->bzero(); newBuffer2->bzero(); BufferVector buffersToWrite; buffersToWrite.reserve(16); while (running_) { assert(newBuffer1 && newBuffer1->length() == 0); assert(newBuffer2 && newBuffer2->length() == 0); assert(buffersToWrite.empty());
{ muduo::MutexLockGuard lock(mutex_); if (buffers_.empty()) { cond_.waitForSeconds(flushInterval_); } buffers_.push_back(std::move(currentBuffer_)); currentBuffer_ = std::move(newBuffer1); buffersToWrite.swap(buffers_); if (!nextBuffer_) { nextBuffer_ = std::move(newBuffer2); } }
assert(!buffersToWrite.empty());
if (buffersToWrite.size() > 25) { char buf[256]; snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n", Timestamp::now().toFormattedString().c_str(), buffersToWrite.size()-2); fputs(buf, stderr); output.append(buf, static_cast<int>(strlen(buf))); buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end()); }
for (const auto& buffer : buffersToWrite) { output.append(buffer->data(), buffer->length()); }
if (buffersToWrite.size() > 2) { buffersToWrite.resize(2); }
if (!newBuffer1) { assert(!buffersToWrite.empty()); newBuffer1 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer1->reset(); }
if (!newBuffer2) { assert(!buffersToWrite.empty()); newBuffer2 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer2->reset(); }
buffersToWrite.clear(); output.flush(); } output.flush(); }
|
改进空间
目前我们一共准备了 4 块缓冲:currentBuffer_
nextBuffer_
newBuffer1
newBuffer2
以及缓冲队列 buffers_
。
如需进一步增加 buffer 数目,可以改用下面的数据结构:
1 2 3
| BufferPtr currentBuffer_; BufferVector emptyBuffers_; BufferVector fullBuffers_;
|