来源:https://github.com/apache/rocketmq
读服务线程
主监听到通道内有数据需要读,处理完读事件后计算心跳时间是否超过haHousekeepingInterval = 1000 * 20
public void run() { HAConnection.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); boolean ok = this.processReadEvent(); if (!ok) { HAConnection.log.error("processReadEvent error"); break; } long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); break; } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + " service has exception.", e); break; } } this.makeStop(); writeSocketService.makeStop(); haService.removeConnection(HAConnection.this); HAConnection.this.haService.getConnectionCount().decrementAndGet(); SelectionKey sk = this.socketChannel.keyFor(this.selector); if (sk != null) { sk.cancel(); } try { this.selector.close(); this.socketChannel.close(); } catch (IOException e) { HAConnection.log.error("", e); } HAConnection.log.info(this.getServiceName() + " service end");}
处理读事件的数据,判断通道内的数据是否超过8个字节,因为从上报的偏移量就是一个long类型8个字节,读取当前buffer位置的前8个有数据的字节,最后之后设置当前处理的位置为当前位置,当buffer没有空余位置的话就会重置,processPostion也会清零。slaveRequestOffset的默认值为-1,所以这个值刚开始小于0,然后就会被赋值为当前从请求的第一个偏移量,初始是0,当然如果中途有重启从的话,重新创建连接的话,这个值也不就不会0了,毕竟那个时候从已经有了很多消息了。
private boolean processReadEvent() { int readSizeZeroTimes = 0; if (!this.byteBufferRead.hasRemaining()) { this.byteBufferRead.flip(); this.processPostion = 0; } while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); if ((this.byteBufferRead.position() - this.processPostion) >= 8) { int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); long readOffset = this.byteBufferRead.getLong(pos - 8); this.processPostion = pos; HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset <0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } } else if (readSize == 0) { if (++readSizeZeroTimes >= 3) { break; } } else { log.error("read socket[" + HAConnection.this.clientAddr + "] <0"); return false; } } catch (IOException e) { log.error("processReadEvent exception", e); return false; } } return true;}
保存从请求的消息偏移量,push2SlaveMaxOffset默认为0,从第一次请求也是0,所以该方法不能执行成功。
public void notifyTransferSome(final long offset) { for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); if (ok) { this.groupTransferService.notifyTransferSome(); break; } else { value = this.push2SlaveMaxOffset.get(); } }}
写服务线程
判断从服务有没有上报偏移量,上报之后slaveRequestOffset就不会为-1,前面的读线程已经把该值置为0了,nextTransferFromWhere默认为-1,当第一次请求0偏移量时,获取主最大偏移量,最后把nextTransferFromWhere设置为当前文件的开始偏移量,当然如果没有消息的话,这个值就为0,也就是第一个文件的开始偏移量。否则的话就设置为从重启后第一次请求的偏移量。
public void run() { HAConnection.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); if (-1 == HAConnection.this.slaveRequestOffset) { Thread.sleep(10); continue; } if (-1 == this.nextTransferFromWhere) { if (0 == HAConnection.this.slaveRequestOffset) { long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getMapedFileSizeCommitLog()); if (masterOffset <0) { masterOffset = 0; } this.nextTransferFromWhere = masterOffset; } else { this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; } log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset); } if (this.lastWriteOver) { long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() .getHaSendHeartbeatInterval()) { // Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(this.nextTransferFromWhere); this.byteBufferHeader.putInt(0); this.byteBufferHeader.flip(); this.lastWriteOver = this.transferData(); if (!this.lastWriteOver) continue; } } else { this.lastWriteOver = this.transferData(); if (!this.lastWriteOver) continue; } SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); if (selectResult != null) { int size = selectResult.getSize(); if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); } long thisOffset = this.nextTransferFromWhere; this.nextTransferFromWhere += size; selectResult.getByteBuffer().limit(size); this.selectMappedBufferResult = selectResult; // Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); this.lastWriteOver = this.transferData(); } else { HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); } } catch (Exception e) { HAConnection.log.error(this.getServiceName() + " service has exception.", e); break; } } HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable(); if (this.selectMappedBufferResult != null) { this.selectMappedBufferResult.release(); } this.makeStop(); readSocketService.makeStop(); haService.removeConnection(HAConnection.this); SelectionKey sk = this.socketChannel.keyFor(this.selector); if (sk != null) { sk.cancel(); } try { this.selector.close(); this.socketChannel.close(); } catch (IOException e) { HAConnection.log.error("", e); } HAConnection.log.info(this.getServiceName() + " service end");}
lastWriteOver默认为true,代表上一次消息同步给从是否完成。判断上一次同步写的时间是否超过了心跳时间haSendHeartbeatInterval = 1000 * 5,如果超过的话就给从写一个不带消息体的消息,消息写进网络通道,但是selectMappedBufferResult为空,所以下面不会写消息体。
private boolean transferData() throws Exception { int writeSizeZeroTimes = 0; // Write Header while (this.byteBufferHeader.hasRemaining()) { int writeSize = this.socketChannel.write(this.byteBufferHeader); if (writeSize > 0) { writeSizeZeroTimes = 0; this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); } else if (writeSize == 0) { if (++writeSizeZeroTimes >= 3) { break; } } else { throw new Exception("ha master write header error <0"); } } if (null == this.selectMappedBufferResult) { return !this.byteBufferHeader.hasRemaining(); } writeSizeZeroTimes = 0; // Write Body if (!this.byteBufferHeader.hasRemaining()) { while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); if (writeSize > 0) { writeSizeZeroTimes = 0; this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); } else if (writeSize == 0) { if (++writeSizeZeroTimes >= 3) { break; } } else { throw new Exception("ha master write body error <0"); } } } boolean result = !this.byteBufferHeader.hasRemaining() if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { this.selectMappedBufferResult.release(); this.selectMappedBufferResult = null; } return result;}
当有消息时,从文件中获取消息的buffer。第一次是从偏移量0开始查找,
public SelectMappedBufferResult getCommitLogData(final long offset) { if (this.shutdown) { log.warn("message store has shutdown, so getPhyQueueData is forbidden"); return null; } return this.commitLog.getData(offset);}public SelectMappedBufferResult getData(final long offset) { return this.getData(offset, offset == 0);}public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); return result; } return null;}
查找文件,当所需要查找的偏移量小于写偏移量才有数据,截取从偏移量开始的ByteBuffer,最后返回查询的消息结果
public SelectMappedBufferResult selectMappedBuffer(int pos) { int readPosition = getReadPosition(); if (pos = 0) { if (this.hold()) { ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); int size = readPosition - pos; ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } } return null;}public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, int size, MappedFile mappedFile) { this.startOffset = startOffset; this.byteBuffer = byteBuffer; this.size = size; this.mappedFile = mappedFile;}
判断消息的查询结果是否为空,判断消息的大小是否超过1024 * 32大小限制,每次最大只能同步这个大小,虽然一个消息的最大大小为maxMessageSize = 1024 * 1024 * 4。设置本次消息头的初始偏移量,然后给下一次传输偏移量加上对应的消息大小保存起来,这次继续给从同步消息transferData,不仅包括消息头,也包括消息体。这时selectMappedBufferResult不为空,往通信通道中写完消息后释放buffer内存资源,返回传输结果。
从broker读取消息并保存
当通道有数据传输过来时触发读事件,因为消息头是12个字节,8个代表偏移量和4个代表消息长度,当然第一次的话总共就读到12个字节,没有消息体。
private boolean dispatchReadRequest() { final int msgHeaderSize = 8 + 4; // phyoffset + size int readSocketPos = this.byteBufferRead.position(); while (true) { int diff = this.byteBufferRead.position() - this.dispatchPostion; if (diff >= msgHeaderSize) { long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); if (slavePhyOffset != 0) { if (slavePhyOffset != masterPhyOffset) { log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset); return false; } } if (diff >= (msgHeaderSize + bodySize)) { byte[] bodyData = new byte[bodySize]; this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); this.byteBufferRead.get(bodyData); HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); this.byteBufferRead.position(readSocketPos); this.dispatchPostion += msgHeaderSize + bodySize; if (!reportSlaveMaxOffsetPlus()) { return false; } continue; } } if (!this.byteBufferRead.hasRemaining()) { this.reallocateByteBuffer(); } break; } return true;}
虽然没有消息体,但是他也满足等于diff >= (msgHeaderSize + bodySize)的条件,从通道中读取消息,添加到文件中,当里面有消息时也是一样的操作,最后移动对应的位置下标,以及重置buffer的位置下标,最后判断文件的最大偏移量是否大于已经给主报告的偏移量,然后设置报告偏移量后重新给主broker报告最新的偏移量。
public boolean appendToCommitLog(long startOffset, byte[] data) { if (this.shutdown) { log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); return false; } boolean result = this.commitLog.appendData(startOffset, data); if (result) { this.reputMessageService.wakeup(); } else { log.error("appendToPhyQueue failed " + startOffset + " " + data.length); } return result;}public boolean appendData(long startOffset, byte[] data) { putMessageLock.lock(); try { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); if (null == mappedFile) { log.error("appendData getLastMappedFile error " + startOffset); return false; } return mappedFile.appendMessage(data); } finally { putMessageLock.unlock(); }}public boolean appendMessage(final byte[] data) { int currentPos = this.wrotePosition.get(); if ((currentPos + data.length) <= this.fileSize) { try { this.fileChannel.position(currentPos); this.fileChannel.write(ByteBuffer.wrap(data)); } catch (Throwable e) { log.error("Error occurred when append message to mappedFile.", e); } this.wrotePosition.addAndGet(data.length); return true; } return false;}
主broker接收从报告最新的偏移量
这个时候slaveAckOffset就不会为0,唤醒主线程的等待获取push2SlaveMaxOffset,刚开始的时候主线程通过putRequest操作使提交请求线程陷入了等待(this.notifyTransferObject.waitForRunning(1000)),然后主线程唤醒了写线程后就开始同步等待等待从broker提交偏移量唤醒。
public void notifyTransferSome() { this.notifyTransferObject.wakeup();}public void wakeup() { synchronized (this) { if (!this.hasNotified) { this.hasNotified = true; this.notify(); } }}GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);public void putRequest(final CommitLog.GroupCommitRequest request) { this.groupTransferService.putRequest(request);}service.getWaitNotifyObject().wakeupAll();boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
当初放入提交请求是触发了读写集合的交换,这块会一直等待从报告的偏移量push2SlaveMaxOffset是否大于等于本次提交请求的偏移量。
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();private volatile List requestsWrite = new ArrayList();private volatile List requestsRead = new ArrayList();public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { synchronized (this.requestsWrite) { this.requestsWrite.add(request); } if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify }}private void swapRequests() { List tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp;}private void doWaitTransfer() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); for (int i = 0; !transferOK i++) { this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); } if (!transferOK) { log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); } req.wakeupCustomer(transferOK); } this.requestsRead.clear(); } }}public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.waitForRunning(10); this.doWaitTransfer(); } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); } } log.info(this.getServiceName() + " service end");}@Overrideprotected void onWaitEnd() { this.swapRequests();}
主线程同步超时等待消息同步成功
protected void waitForRunning(long interval) { synchronized (this) { if (this.hasNotified) { this.hasNotified = false; this.onWaitEnd(); return; } try { this.wait(interval); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { this.hasNotified = false; this.onWaitEnd(); } }}
写线程在没有消息可同步的时候会超时等待
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);public void allWaitForRunning(long interval) { long currentThreadId = Thread.currentThread().getId(); synchronized (this) { Boolean notified = this.waitingThreadTable.get(currentThreadId); if (notified != null this.onWaitEnd(); return; } try { this.wait(interval); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { this.waitingThreadTable.put(currentThreadId, false); this.onWaitEnd(); } }}
下面是当有新消息时主线程唤醒所有等待的写线程
public void wakeupAll() { synchronized (this) { boolean needNotify = false; for (Boolean value : this.waitingThreadTable.values()) { needNotify = needNotify || !value; value = true; } if (needNotify) { this.notifyAll(); } }}
主线程在等待同步成功,超时时间为syncFlushTimeout = 1000 * 5
public boolean waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { log.error("Interrupted", e); return false; }}
唤醒主线程通过notifyTransferObject间接唤醒,这里等待从报告最大偏移量的时间也是通过5次循环,每次1000ms来判断。
public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown();}