当前位置 : 主页 > 编程语言 > java >

log&& buffevent&&内存池 1

来源:互联网 收集:自由互联 发布时间:2022-06-30
并发编程有两种:消息传递 以及共享内存 消息传递更容易保证程序的正确性!!! 线程同步原则: 尽可能降低共享对象!减少同步的场合 使用高级的并发结构- 任务队列 生产者消费

并发编程有两种:消息传递 以及共享内存

  消息传递更容易保证程序的正确性!!!

线程同步原则:

  • 尽可能降低共享对象!减少同步的场合
  • 使用高级的并发结构-  任务队列  生产者消费者模型   引用计数
  • 不到万不得已不要使用底层同步源语,只用非递归互斥和条件变量;至于读写锁信号量 这个 还是不用吧!! 读写锁 不一定比互斥锁效率高不好掌握!
  • 使用lock-free 库 以及atomic 库 以及CAS 库 不要使用内核级别的同步源语!!能有现成就用现成的库,自己写 就呵呵吧!!
  • 对于 spink-lock  mutex sem 等效率对比 就算了!! 一般 mutex 、条件变量、CAS、atomic、look-free、队列、倒计时-引用计数  足够解决问题

回到主题:多线程logging

  • 多线程多进程同时写log不出现race 需要解决并发问题
  • 能够应对多进程多线程产生的大量log 场景
  • 不阻塞正常执行流程

多线程异步log:

  线程安全好解决!多个线程写log 时使用全局mutex 或者每个线程写一个log文件,但是引入问题:所有线程抢占一个锁,或者业务线程阻塞在磁盘io上;

目前认为最好的办法是: 一个专有线程写log到磁盘等!! 业务线程发送log 到log-thread; log-thread 负责收集log并写入到磁盘文件等!!

所以肯定会涉及到‘队列’  ;业务进程会有缓冲队列缓存log 然后发送到log-thread!! log-thread 接收后主动写入!!

 

  • 往文件写日志的一个常见问题就是,万一程序崩溃,那么最后若干条日志往往就丢失了,因为日志库不能每条信息都flush硬盘,更不能每条日志都open/close文件,这样开销太大;所以需要定时定期刷新数据到磁盘
  • 在core dump sig-gandle 中执行刷新log函数;就能将尚未来得及写入磁盘的消息写入磁盘
  • 日志消息的格式主要有以下几个要素:
  • 尽量每条日志都占一行;
  • 时间戳精确到微秒/毫秒
  • 始终使用GTM时区
  • 打印线程id
  • 打印日志级别
  • 打印源文件名和行号

问题:线程安全与轮替是个难点。引入锁则带来复杂性以及性能问题  

  目前使用O_APPEND方式打开文件,这个标记让write写出的内容添加到文件末尾,移动文件指针与输出内容是原子的,由操作系统来保证原子性。因此这个标记保证在多线程/多进程调用write也能够保持输出的内容不会相互覆盖错乱,nginx的日志也利用了这个标记来达到多进程不干扰!!---但是---->每次 log 都会执行写操作,并且写操作是等锁的,写文件本身就比较耗时,再加上等锁的时间,会阻塞当前写 log 的业务工作进程or线程 

  轮替的过程中,需要关闭当前文件并打开新文件,让新的内容写到新文件中,在多线程环境下就需要锁来同步所有线程的日志输出操作,避免写入到不合法的文件描述符中!!!

  目前可以使用posix里的dup2来做到无锁轮替文件

//轮替时,首先重命名已打开的日志文件,保持打开状态,
rename(filename, newname);
//然后创建新的日志文件
fd = open(filename,...);
//使用dup2系统函数把fd(新)复制到fd_(旧)上
dup2(fd, fd_);
//关闭fd(新)
close(fd);

  其中dup2是原子操作,它会关闭fd_并且把fd_也指向fd打开的文件。因此fd_这个文件描述符总是保持打开状态,并且值不变,但是前后指向了不同的文件,完全不会影响其他线程调用write(fd_, ...)等操作。另一边write也是个原子操作,它与dup2不会交叉进行,因此保证了日志系统的正确性。

muduo async log 日志逻辑

  muduo 的异步日志是将写日志的操作放在单独的日志线程中,这里分为多个应用线程和专用的日志线程,同时有多块缓存,大概可以分为两大块缓存池,有收集日志的缓存池和专用于写日志的缓存池,收集日志的缓存池(buffer_vector)中有两块 buffer,称为 current_buffer 和 next_buffer,多个应用线程的日志都会写到 current_buffer(buffer_mutex)中,当 current_buffer 满的时候,将 current_buffer 的指针存到 buffer_vector 中,同时 current_buffer 的指针指向 next_buffer,这样应用线程可以继续写日志到 current_buffer 中,current_buffer 的指针存到 buffer_vector 后,会通知到日志线程,这里加上锁来控制 current_buffer(buffer_mutex),写日志的缓存池叫 write_buffer_vector,里面也有两块缓存 newBuffer1 和 newBuffer2,这时再将 current_buffer 的指针存入 buffer_vector 中,这时 buffer_vector 中有两块缓存的指针,之后将 buffer_vector 和 write_buffer_vector 交换,buffer_vector 就是空,同时 current_buffer 指针指向 newBuffer1,next_buffer 指针指向 newBuffer2,释放锁(buffer_mutex),这时 log 线程可以进行写操作,write_buffer_vector 的大小为 2,将里面的两块内存都写到文件中,同时 newBuffer1 和 newBuffer2 指针分别指向这两块内存,这样下次再执行交换操作时候 write_buffer_vector 和 newBuffer1 和 newBuffer2 都是空,一直循环执行这类操作,log 一般都是写文件时候时间比较长,将数据 memcpy 到 buffer 中耗时较少,这样可以大幅减少等锁的时间,提升 log 的性能。

void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else
{
buffers_.push_back(std::move(currentBuffer_));

if (nextBuffer_)
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
}
currentBuffer_->append(logline, len);
cond_.notify();
}
}

void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false);
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite;
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());

{
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
buffers_.push_back(std::move(currentBuffer_));
currentBuffer_ = std::move(newBuffer1);
buffersToWrite.swap(buffers_);
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2);
}
}

assert(!buffersToWrite.empty());

if (buffersToWrite.size() > 2)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}

for (const auto& buffer : buffersToWrite)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());
}

if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
}

if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}

if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}

buffersToWrite.clear();
output.flush();
}
output.flush();
}

 

 

    http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!! 但行好事 莫问前程 --身高体重180的胖子

上一篇:ngx内存池设计概阅
下一篇:没有了
网友评论