线程安全的多线程异步日志
需要一个队列来讲日志前端的数据传送到后端(日志线程),但这个队列不必是现成的 BlockingQueue<std::string>,因为不用每次产生一条日志消息都通知接收方。
muduo 日志库采用的是双缓冲(double buffering)技术。
基本思路:
- 准备两块 buffer;
 
- 前端负责往 buffer A 填数据(日志消息);
 
- 后端负责将 buffer B 的数据写入文件;
 
- 当 buffer A 写满后,交换 A 和 B;
 
- 后端将 buffer A 的数据写入文件;
 
- 而前端则往 buffer B 填入心得日志消息;
 
- 循环往复……
 
实现
代码位置:muduo/base/AsyncLogging.cc
前端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
   | void AsyncLogging::append(const char* logline, int len) {   muduo::MutexLockGuard lock(mutex_);   if (currentBuffer_->avail() > len)   {     currentBuffer_->append(logline, len);   }   else   {     buffers_.push_back(std::move(currentBuffer_));
      if (nextBuffer_)     {       currentBuffer_ = std::move(nextBuffer_);     }     else     {       currentBuffer_.reset(new Buffer);      }     currentBuffer_->append(logline, len);     cond_.notify();   } }
 
  | 
 
后端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
   | void AsyncLogging::threadFunc() {   assert(running_ == true);   latch_.countDown();   LogFile output(basename_, rollSize_, false);   BufferPtr newBuffer1(new Buffer);   BufferPtr newBuffer2(new Buffer);   newBuffer1->bzero();   newBuffer2->bzero();   BufferVector buffersToWrite;   buffersToWrite.reserve(16);   while (running_)   {     assert(newBuffer1 && newBuffer1->length() == 0);     assert(newBuffer2 && newBuffer2->length() == 0);     assert(buffersToWrite.empty());
      {       muduo::MutexLockGuard lock(mutex_);       if (buffers_.empty())         {         cond_.waitForSeconds(flushInterval_);       }       buffers_.push_back(std::move(currentBuffer_));       currentBuffer_ = std::move(newBuffer1);       buffersToWrite.swap(buffers_);       if (!nextBuffer_)       {         nextBuffer_ = std::move(newBuffer2);       }     }
      assert(!buffersToWrite.empty());
      if (buffersToWrite.size() > 25)     {       char buf[256];       snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",                Timestamp::now().toFormattedString().c_str(),                buffersToWrite.size()-2);       fputs(buf, stderr);       output.append(buf, static_cast<int>(strlen(buf)));       buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());     }
      for (const auto& buffer : buffersToWrite)     {              output.append(buffer->data(), buffer->length());     }
      if (buffersToWrite.size() > 2)     {              buffersToWrite.resize(2);     }
      if (!newBuffer1)     {       assert(!buffersToWrite.empty());       newBuffer1 = std::move(buffersToWrite.back());       buffersToWrite.pop_back();       newBuffer1->reset();     }
      if (!newBuffer2)     {       assert(!buffersToWrite.empty());       newBuffer2 = std::move(buffersToWrite.back());       buffersToWrite.pop_back();       newBuffer2->reset();     }
      buffersToWrite.clear();     output.flush();   }   output.flush(); }
 
  | 
 
改进空间
目前我们一共准备了 4 块缓冲:currentBuffer_ nextBuffer_ newBuffer1 newBuffer2 以及缓冲队列 buffers_。
如需进一步增加 buffer 数目,可以改用下面的数据结构:
1 2 3
   | BufferPtr       currentBuffer_;  BufferVector    emptyBuffers_;   BufferVector    fullBuffers_;   
 
  |