并发编程有两种:消息传递 以及共享内存
消息传递更容易保证程序的正确性!!!
线程同步原则:
- 尽可能降低共享对象!减少同步的场合
- 使用高级的并发结构- 任务队列 生产者消费者模型 引用计数
- 不到万不得已不要使用底层同步源语,只用非递归互斥和条件变量;至于读写锁信号量 这个 还是不用吧!! 读写锁 不一定比互斥锁效率高不好掌握!
- 使用lock-free 库 以及atomic 库 以及CAS 库 不要使用内核级别的同步源语!!能有现成就用现成的库,自己写 就呵呵吧!!
- 对于 spink-lock mutex sem 等效率对比 就算了!! 一般 mutex 、条件变量、CAS、atomic、look-free、队列、倒计时-引用计数 足够解决问题
回到主题:多线程logging
- 多线程多进程同时写log不出现race 需要解决并发问题
- 能够应对多进程多线程产生的大量log 场景
- 不阻塞正常执行流程
多线程异步log:
线程安全好解决!多个线程写log 时使用全局mutex 或者每个线程写一个log文件,但是引入问题:所有线程抢占一个锁,或者业务线程阻塞在磁盘io上;
目前认为最好的办法是: 一个专有线程写log到磁盘等!! 业务线程发送log 到log-thread; log-thread 负责收集log并写入到磁盘文件等!!
所以肯定会涉及到‘队列’ ;业务进程会有缓冲队列缓存log 然后发送到log-thread!! log-thread 接收后主动写入!!
- 往文件写日志的一个常见问题就是,万一程序崩溃,那么最后若干条日志往往就丢失了,因为日志库不能每条信息都flush硬盘,更不能每条日志都open/close文件,这样开销太大;所以需要定时定期刷新数据到磁盘
- 在core dump sig-gandle 中执行刷新log函数;就能将尚未来得及写入磁盘的消息写入磁盘
- 日志消息的格式主要有以下几个要素:
- 尽量每条日志都占一行;
- 时间戳精确到微秒/毫秒
- 始终使用GTM时区
- 打印线程id
- 打印日志级别
- 打印源文件名和行号
问题:线程安全与轮替是个难点。引入锁则带来复杂性以及性能问题
目前使用O_APPEND方式打开文件,这个标记让write写出的内容添加到文件末尾,移动文件指针与输出内容是原子的,由操作系统来保证原子性。因此这个标记保证在多线程/多进程调用write也能够保持输出的内容不会相互覆盖错乱,nginx的日志也利用了这个标记来达到多进程不干扰!!---但是---->每次 log 都会执行写操作,并且写操作是等锁的,写文件本身就比较耗时,再加上等锁的时间,会阻塞当前写 log 的业务工作进程or线程
轮替的过程中,需要关闭当前文件并打开新文件,让新的内容写到新文件中,在多线程环境下就需要锁来同步所有线程的日志输出操作,避免写入到不合法的文件描述符中!!!
目前可以使用posix里的dup2来做到无锁轮替文件
//轮替时,首先重命名已打开的日志文件,保持打开状态,rename(filename, newname);
//然后创建新的日志文件
fd = open(filename,...);
//使用dup2系统函数把fd(新)复制到fd_(旧)上
dup2(fd, fd_);
//关闭fd(新)
close(fd);
其中dup2是原子操作,它会关闭fd_并且把fd_也指向fd打开的文件。因此fd_这个文件描述符总是保持打开状态,并且值不变,但是前后指向了不同的文件,完全不会影响其他线程调用write(fd_, ...)等操作。另一边write也是个原子操作,它与dup2不会交叉进行,因此保证了日志系统的正确性。
muduo async log 日志逻辑
muduo 的异步日志是将写日志的操作放在单独的日志线程中,这里分为多个应用线程和专用的日志线程,同时有多块缓存,大概可以分为两大块缓存池,有收集日志的缓存池和专用于写日志的缓存池,收集日志的缓存池(buffer_vector)中有两块 buffer,称为 current_buffer 和 next_buffer,多个应用线程的日志都会写到 current_buffer(buffer_mutex)中,当 current_buffer 满的时候,将 current_buffer 的指针存到 buffer_vector 中,同时 current_buffer 的指针指向 next_buffer,这样应用线程可以继续写日志到 current_buffer 中,current_buffer 的指针存到 buffer_vector 后,会通知到日志线程,这里加上锁来控制 current_buffer(buffer_mutex),写日志的缓存池叫 write_buffer_vector,里面也有两块缓存 newBuffer1 和 newBuffer2,这时再将 current_buffer 的指针存入 buffer_vector 中,这时 buffer_vector 中有两块缓存的指针,之后将 buffer_vector 和 write_buffer_vector 交换,buffer_vector 就是空,同时 current_buffer 指针指向 newBuffer1,next_buffer 指针指向 newBuffer2,释放锁(buffer_mutex),这时 log 线程可以进行写操作,write_buffer_vector 的大小为 2,将里面的两块内存都写到文件中,同时 newBuffer1 和 newBuffer2 指针分别指向这两块内存,这样下次再执行交换操作时候 write_buffer_vector 和 newBuffer1 和 newBuffer2 都是空,一直循环执行这类操作,log 一般都是写文件时候时间比较长,将数据 memcpy 到 buffer 中耗时较少,这样可以大幅减少等锁的时间,提升 log 的性能。
void AsyncLogging::append(const char* logline, int len){
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else
{
buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_)
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
}
currentBuffer_->append(logline, len);
cond_.notify();
}
}
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false);
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite;
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
buffers_.push_back(std::move(currentBuffer_));
currentBuffer_ = std::move(newBuffer1);
buffersToWrite.swap(buffers_);
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2);
}
}
assert(!buffersToWrite.empty());
if (buffersToWrite.size() > 2)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
for (const auto& buffer : buffersToWrite)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());
}
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}
http代理服务器(3-4-7层代理)-网络事件库公共组件、内核kernel驱动 摄像头驱动 tcpip网络协议栈、netfilter、bridge 好像看过!!!!
但行好事 莫问前程
--身高体重180的胖子