ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Muduo源码Base篇

2021-05-02 11:00:07  阅读:182  来源: 互联网

标签:Muduo const buffersToWrite int len char Base 源码 append


Muduo异步日志

先来看AsyncLogging类的定义

class AsyncLogging : noncopyable
{
 public:

  AsyncLogging(const string& basename,
               off_t rollSize,
               int flushInterval = 3);

  ~AsyncLogging()
  {
    if (running_)
    {
      stop();
    }
  }

  void append(const char* logline, int len);

  void start()
  {
    running_ = true;
    thread_.start();
    latch_.wait();
  }

  void stop() NO_THREAD_SAFETY_ANALYSIS
  {
    running_ = false;
    cond_.notify();
    thread_.join();
  }

 private:

  void threadFunc();

  typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; //缓冲区大小 4000*1000
  typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
  typedef BufferVector::value_type BufferPtr;

  const int flushInterval_;//
  std::atomic<bool> running_;
  const string basename_;
  const off_t rollSize_;//滚动大小
  muduo::Thread thread_;
  muduo::CountDownLatch latch_;
  muduo::MutexLock mutex_;
  muduo::Condition cond_ GUARDED_BY(mutex_);
  BufferPtr currentBuffer_ GUARDED_BY(mutex_);
  BufferPtr nextBuffer_ GUARDED_BY(mutex_);
  BufferVector buffers_ GUARDED_BY(mutex_);
};
}  // namespace muduo

再看这个类具体实现之前 先来看里面用到的辅助类

class LogFile : noncopyable
{
 public:
  LogFile(const string& basename,
          off_t rollSize,
          bool threadSafe = true,
          int flushInterval = 3,
          int checkEveryN = 1024);
  ~LogFile();

  void append(const char* logline, int len);
  void flush();
  bool rollFile();

 private:
  void append_unlocked(const char* logline, int len);

  static string getLogFileName(const string& basename, time_t* now);

  const string basename_;
  const off_t rollSize_;//写入的大小
  const int flushInterval_;
  const int checkEveryN_;//该文件中的日志条数

  int count_;

  std::unique_ptr<MutexLock> mutex_;
  time_t startOfPeriod_;
  time_t lastRoll_;
  time_t lastFlush_;
  std::unique_ptr<FileUtil::AppendFile> file_;

  const static int kRollPerSeconds_ = 60*60*24;
};

其中FileUtil类如下

class AppendFile : noncopyable
{
 public:
  explicit AppendFile(StringArg filename);

  ~AppendFile();

  void append(const char* logline, size_t len);

  void flush();

  off_t writtenBytes() const { return writtenBytes_; }

 private:

  size_t write(const char* logline, size_t len);

  FILE* fp_;
  char buffer_[64*1024];
  off_t writtenBytes_;
};

}  // namespace FileUtil
}  // namespace muduo

其中StringArg类是一个传递c语言的字符串的类型的类定义如下

class StringArg // copyable
{
 public:
  StringArg(const char* str)
    : str_(str)
  { }

  StringArg(const string& str)
    : str_(str.c_str())
  { }

  const char* c_str() const { return str_; }

 private:
  const char* str_;
};

AppendFile的构造函数如下

FileUtil::AppendFile::AppendFile(StringArg filename)
  : fp_(::fopen(filename.c_str(), "ae")),  // 'e' for O_CLOEXEC
    writtenBytes_(0)
{
  assert(fp_);
  ::setbuffer(fp_, buffer_, sizeof buffer_);
  // posix_fadvise POSIX_FADV_DONTNEED ?
}

setbuffer设置文件流对应的缓冲区大小
如下例子



#include<iostream>
#include<cstring>
#include<unistd.h>
using namespace std;

int main(){

    char *buf=(char*)malloc(sizeof(char)*1024*10);
    setbuffer(stdout,buf,1024*10);
    char buffer[10];
    memset(buffer,'a',sizeof(buffer));
    for(int i=0;i<1000;++i)
        printf("%s",buffer);
    sleep(10);
    while(1);
}

此处永远不会有输出 因为标准输出属于行缓冲 现将缓冲区大小改为1024的10倍 总共输出了10000字节 缓冲区未满 而且没有换行符出现 程序是个死循环 无法结束 导致无法输出
一般而言 对于终端 诸如标准输入 标准输出属于行缓冲 遇到换行符或者缓冲区满了 才会输出
标准出错不带缓冲 以便直接打印错误信息

再来看AppendFile的构造函数 将缓冲区设置为buffer_的大小
append写入logline的len字节

void FileUtil::AppendFile::append(const char* logline, const size_t len)
{
  size_t written = 0;

  while (written != len)
  {
    size_t remain = len - written;
    size_t n = write(logline + written, remain);
    if (n != remain)
    {
      int err = ferror(fp_);
      if (err)
      {
        fprintf(stderr, "AppendFile::append() failed %s\n", strerror_tl(err));
        break;
      }
    }
    written += n;
  }

  writtenBytes_ += written;
}

write函数如下


size_t FileUtil::AppendFile::write(const char* logline, size_t len)
{
  // #undef fwrite_unlocked
  return ::fwrite_unlocked(logline, 1, len, fp_);
}

fwrite_unlocked为fwrite的线程不安全版本 至于这里为什么使用这个 还要等到分析AsyncLogging类才能揭晓
接着来看之前的LogFile类的构造函数

LogFile::LogFile(const string& basename,
                 off_t rollSize,
                 bool threadSafe,
                 int flushInterval,
                 int checkEveryN)
  : basename_(basename),//文件名
    rollSize_(rollSize),//最大字节
    flushInterval_(flushInterval),//刷新间隔 即flush间隔
    checkEveryN_(checkEveryN),//最大日志条数
    count_(0),//当前日志条数
    mutex_(threadSafe ? new MutexLock : NULL),
    startOfPeriod_(0),//前一天时间
    lastRoll_(0),//上次刷新时间
    lastFlush_(0)
{
//检查basename是否合法
  assert(basename.find('/') == string::npos);
  rollFile();
}
bool LogFile::rollFile()
{
  time_t now = 0;
  string filename = getLogFileName(basename_, &now);//获得文件名称
  time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;即(now-now%KRollPerSeconds_)

  if (now > lastRoll_)
  {
    lastRoll_ = now;
    lastFlush_ = now;
    startOfPeriod_ = start;
    file_.reset(new FileUtil::AppendFile(filename));//换一个文件
    return true;
  }
  return false;
}
//生成日志的文件名
string LogFile::getLogFileName(const string& basename, time_t* now)
{
  string filename;
  filename.reserve(basename.size() + 64);
  filename = basename;

  char timebuf[32];
  struct tm tm;
  *now = time(NULL);
  gmtime_r(now, &tm); // FIXME: localtime_r ?
  strftime(timebuf, sizeof timebuf, ".%Y%m%d-%H%M%S.", &tm);
  filename += timebuf;

  filename += ProcessInfo::hostname();

  char pidbuf[32];
  snprintf(pidbuf, sizeof pidbuf, ".%d", ProcessInfo::pid());
  filename += pidbuf;

  filename += ".log";

  return filename;
}

向当前日志中添加信息append函数如下

void LogFile::append(const char* logline, int len)
{
  if (mutex_)
  {
    MutexLockGuard lock(*mutex_);
    append_unlocked(logline, len);
  }
  else
  {
    append_unlocked(logline, len);
  }
}

此处利用了MutexLockGuard上锁 因此append_unlocked也就不需要任何加锁操作 正是在这个函数种调用了AppendFile的append 继而调用fwrite_unlocked函数

void LogFile::append_unlocked(const char* logline, int len)
{
  file_->append(logline, len);

  if (file_->writtenBytes() > rollSize_)
  {
    rollFile();
  }
  else
  {
    ++count_;
    if (count_ >= checkEveryN_)
    {
      count_ = 0;
      time_t now = ::time(NULL);
      time_t thisPeriod_ = now / kRollPerSeconds_ * kRollPerSeconds_;
      if (thisPeriod_ != startOfPeriod_)
      {
        rollFile();
      }
      else if (now - lastFlush_ > flushInterval_)
      {
        lastFlush_ = now;
        file_->flush();
      }
    }
  }
}

当作为缓冲区的AppendFile的大小大与当前rollSize则需要重新生成一个新的日志文件
否则 更新日志条数 当前日志条数如果大于最大值
天数不一样则重新换一个日志写 或者当前与之前刷新的时间间隔大于设置的时间间隔 则刷新到磁盘中

注意!!当写入日志在文件中过快时 会导致实际写入的文件大小与预期的文件大小不一样 尽管每次都rollFile但是可能多次获得的filename是同一个 导致和预期大小差的较多

在多线程程序中常用的日志 并不是直接进行磁盘IO 而是采用异步日志的方法 单独开启一个线程去做日志的写入操作

现在继续来看AsynLogging类 采取双缓冲

AsyncLogging::AsyncLogging(const string& basename,
                           off_t rollSize,
                           int flushInterval)
  : flushInterval_(flushInterval),//flush时间间隔
    running_(false),//线程是否运行
    basename_(basename),
    rollSize_(rollSize),//日志大致大小 写入过快导致大小不符
    thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),
    latch_(1),//必须的一步
    mutex_(),
    cond_(mutex_),
    currentBuffer_(new Buffer),
    nextBuffer_(new Buffer),
    buffers_()
{
  currentBuffer_->bzero();
  nextBuffer_->bzero();
  buffers_.reserve(16);
}

append函数

void AsyncLogging::append(const char* logline, int len)
{
  muduo::MutexLockGuard lock(mutex_);
  if (currentBuffer_->avail() > len)
  {
    currentBuffer_->append(logline, len);
  }
  else
  {
    buffers_.push_back(std::move(currentBuffer_));

    if (nextBuffer_)
    {
      currentBuffer_ = std::move(nextBuffer_);
    }
    else
    {
      currentBuffer_.reset(new Buffer); // Rarely happens
    }
    currentBuffer_->append(logline, len);
    cond_.notify();
  }
}

前端操作:当前buffer能容纳下数据则直接加入 否则利用下一块buffer 将其移为当前buffer
后端操作

void AsyncLogging::threadFunc()
{
  assert(running_ == true);
  latch_.countDown();
  LogFile output(basename_, rollSize_, false);
  BufferPtr newBuffer1(new Buffer);
  BufferPtr newBuffer2(new Buffer);
  newBuffer1->bzero();
  newBuffer2->bzero();
  BufferVector buffersToWrite;
  buffersToWrite.reserve(16);
  while (running_)
  {
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());

    {
      muduo::MutexLockGuard lock(mutex_);
      //满足两种条件之一即可
      if (buffers_.empty())  // unusual usage!
      {
        cond_.waitForSeconds(flushInterval_);
      }
      buffers_.push_back(std::move(currentBuffer_));
      currentBuffer_ = std::move(newBuffer1);
      buffersToWrite.swap(buffers_);
      if (!nextBuffer_)
      {
        nextBuffer_ = std::move(newBuffer2);
      }
    }

    assert(!buffersToWrite.empty());

    if (buffersToWrite.size() > 25)
    {
      char buf[256];
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);
      fputs(buf, stderr);
      output.append(buf, static_cast<int>(strlen(buf)));
      buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
    }

    for (const auto& buffer : buffersToWrite)
    {
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      output.append(buffer->data(), buffer->length());
    }

    if (buffersToWrite.size() > 2)
    {
      // drop non-bzero-ed buffers, avoid trashing
      buffersToWrite.resize(2);
    }

    if (!newBuffer1)
    {
      assert(!buffersToWrite.empty());
      newBuffer1 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer1->reset();
    }

    if (!newBuffer2)
    {
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }

    buffersToWrite.clear();
    output.flush();
  }
  output.flush();
}

后端利用writeBufferVector缩短临界区 将当前buffer加入到buffers_中 newBuffer移为当前buffer 如果前端写入数据过多 导致nextBuffer也使用了 则将newBuffer2移为nextBuffer 交换buffers_和writeBufferVector 退出缓冲区 之后保证有两个缓冲区可用即newBuffer1 newBuffer2

标签:Muduo,const,buffersToWrite,int,len,char,Base,源码,append
来源: https://blog.csdn.net/Nintendo_Nerd/article/details/116159056

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有