Redis从应用维度有:缓存使用、集群运用、数据结构的巧妙使用;
Redis从系统维度有:可以归类为三类:
- 高性能:线程模型、网络 IO 模型、数据结构、持久化机制;
- 高可用:主从复制、哨兵集群;
- 高拓展:Cluster 分片集群
Redis 为了高性能,从各方各面都进行了优化。根据官方数据,Redis 的 QPS 可以达到约 100000(每秒请求数),有兴趣的可以参考官方的基准程序测试《How fast is Redis?》,官方地址:https://redis.io/topics/benchmarks
横轴是连接数,纵轴是 QPS。此时,这张图反映了一个数量级
二、Redis为什么这么快一般我们在分析一个软件性能的时候会从几个主要方面进行分析:存储方式、CPU、和网络交互;Redis 的高性能主要依赖于几个方面。
- C 语言实现,C 语言在一定程度上还是比 Java 语言性能要高一些,因为 C 语言不需要经过 JVM 进行翻译。
- Redis是基于内存操作,需要的时候需要我们手动持久化到硬盘中
- Redis高效数据结构,对数据的操作也比较简单
- Redis是单线程模型,从而避开了多线程中上下文频繁切换的操作
- 使用多路I/O复用模型,非阻塞I/O
- 使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis直接自己构建了VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求
下面分别从上述几个方面进行展开说明,先来看网络I/O的多路复用模型。
三、从请求处理开始分析当我们在客户端向Redis Server发送一条指令,并且得到Redis回复的整个过程中,Redis做了什么呢?
要处理命令,则redis必须完整地接收客户端的请求,并将命令解析出来,再将结果读出来,通过网络回写到客户端。整个工序分为以下几个部分:
-
接收,通过TCP接收到命令,可能会历经多次TCP包、ack、IO操作
-
解析,将命令取出来
-
执行,到对应的地方将value读出来
-
返回,将value通过TCP返回给客户端,如果value较大,则IO负荷会更重
其中解析和执行是纯cpu/内存操作,而接收和返回主要是IO操作,首先我们先来看通信的过程。
四、网络IO通信原理首先,对于TCP通信来说,每个TCP Socket的内核中都有一个发送缓冲区和一个接收缓冲区
接收缓冲区把数据缓存到内核,若应用进程一直没有调用Socket的read方法进行读取,那么该数据会一直被缓存在接收缓冲区内。不管进程是否读取Socket,对端发来的数据都会经过内核接收并缓存到Socket的内核接收缓冲区。
read所要做的工作,就是把内核接收缓冲区中的数据复制到应用层用户的Buffer里。
进程调用Socket的send发送数据的时候,一般情况下是将数据从应用层用户的Buffer里复制到Socket的内核发送缓冲区,然后send就会在上层返回。换句话说,send返回时,数据不一定会被发送到对端。
网卡中的缓冲区既不属于内核空间,也不属于用户空间。它属于硬件缓冲,允许网卡与操作系统之间有个缓冲;内核缓冲区在内核空间,在内存中,用于内核程序,做为读自或写往硬件的数据缓冲区;用户缓冲区在用户空间,在内存中,用于用户程序,做为读自或写往硬件的数据缓冲区
网卡芯片收到网络数据会以中断的方式通知CPU,我有数据了,存在我的硬件缓冲里了,来读我啊。CPU收到这个中断信号后,会调用相应的驱动接口函数从网卡的硬件缓冲里把数据读到内核缓冲区,正常情况下会向上传递给TCP/IP模块一层一层的处理。
BIO模型Redis的通信采用的是多路复用机制,什么是多路复用机制呢? 由于Redis是C语言实现,为了简化大家的理解,我们采用Java语言来描述这个过程。 在理解多路复用之前,我们先来了解一下BIO。
BIO模型 在Java中,如果要实现网络通信,我们会采用Socket套接字来完成。
Socket这不是一个协议,而是一个通信模型。其实它最初是BSD发明的,主要用来一台电脑的两个进程 间通信,然后把它用到了两台电脑的进程间通信。所以,可以把它简单理解为进程间通信,不是什么高 级的东西。主要做的事情不就是:
- A发包:发请求包给某个已经绑定的端口(所以我们经常会访问这样的地址182.13.15.16:1235, 1235就是端口);收到B的允许;然后正式发送;发送完了,告诉B要断开链接;收到断开允许, 马上断开,然后发送已经断开信息给B。
- B收包:绑定端口和IP;然后在这个端口监听;接收到A的请求,发允许给A,并做好接收准备,主 要就是清理缓存等待接收新数据;然后正式接收;接受到断开请求,允许断开;确认断开后,继续 监听其它请求。
可见,Socket其实就是I/O操作,Socket并不仅限于网络通信,在网络通信中,它涵盖了网络层、传输 层、会话层、表示层、应用层——其实这都不需要记,因为Socket通信时候用到了IP和端口,仅这两个 就表明了它用到了网络层和传输层;而且它无视多台电脑通信的系统差别,所以它涉及了表示层;一般 Socket都是基于一个应用程序的,所以会涉及到会话层和应用层。
构建基础的BIO通信模型 ,BIO有什么弊端呢? 当服务端收到客户端的请求后,不直接返回,而是等待20s。
public class BIOServerSocket { //先定义一个端口号,这个端口的值是可以自己调整的。 static final int DEFAULT_PORT = 8080; public static void main(String[] args) throws IOException, InterruptedException { ServerSocket serverSocket = null; serverSocket = new ServerSocket(DEFAULT_PORT); System.out.println("启动服务,监听端口:" + DEFAULT_PORT); while (true) { //case1: 增加循环,允许循环接收请求 Socket socket = serverSocket.accept(); System.out.println("客户端:" + socket.getPort() + "已连接"); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader (socket.getInputStream())); String clientStr = bufferedReader.readLine(); //读取一行信息 System.out.println("客户端发了一段消息:" + clientStr); Thread.sleep(20000); //case2: 修改:增加等待时间 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter (socket.getOutputStream())); bufferedWriter.write("我已经收到你的消息了\n"); bufferedWriter.flush(); //清空缓冲区触发消息发送 } } }
这个情况会导致一个问题,如果服务端在同一个时刻只能处理一个客户端的连接,而如果一个网站同时 有1000个用户访问,那么剩下的999个用户都需要等待,而这个等待的耗时取决于前面的请求的处理时长.
基于多线程优化BIO 为了让服务端能够同时处理更多的客户端连接,避免因为某个客户端连接阻塞导致后续请求被阻塞,于是引入多线程技术。如图所示,当引入了多线程之后,每个客户端的链接(Socket),我们可以直接给到线程池去执 行,而由于这个过程是异步的,所以并不会同步阻塞影响后续链接的监听,因此在一定程度上可以提升 服务端链接的处理数量。public class BIOServerSocketWithThread { static ExecutorService executorService= Executors.newFixedThreadPool (10); public static void main(String[] args) { ServerSocket serverSocket=null; try{ serverSocket=new ServerSocket ( 8080 ); System.out.println("启动服务:监听端口:8080"); while (true){ Socket socket = serverSocket.accept(); //连接阻塞 System.out.println("客户端:" + socket.getPort()); //IO变成了异步执行 executorService.submit ( new SocketThread (socket) ); } }catch (IOException e){ e.printStackTrace (); }finally { if (serverSocket!=null){ try { serverSocket.close (); }catch (IOException e){ e.printStackTrace (); } } } } }
public class SocketThread implements Runnable{ private Socket socket; public SocketThread(Socket socket) { this.socket=socket; } @Override public void run() { try { //inputstream是阻塞的(***) BufferedReader bufferedReader = new BufferedReader(new InputStreamReader (socket.getInputStream())); //表示获取客户端的请求报文 String clientStr = bufferedReader.readLine(); System.out.println("收到客户端发送的消息:" + clientStr); BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter (socket.getOutputStream())); bufferedWriter.write("receive a message:" + clientStr + "\n"); bufferedWriter.flush(); }catch (Exception e){ e.printStackTrace (); }finally { } } }
传统 I/O 数据拷贝:
以读操作为例:当应用程序执行 read 系统调用读取文件描述符(FD)的时候,如果这块数据已经存在于用户进程的页内存中,就直接从内存中读取数据。如果数据不存在,则先将数据从磁盘加载数据到内核缓冲区中,再从内核缓冲区拷贝到用户进程的页内存中。(两次拷贝,两次 user 和 kernel 的上下文切换)。
I/O 的阻塞到底阻塞在哪里?Blocking I/O:
当使用 read 或 write 对某个文件描述符进行过读写时,如果当前 FD 不可读,系统就不会对其他的操作做出响应。从设备复制数据到内核缓冲区是阻塞的,从内核缓冲区拷贝到用户空间,也是阻塞的,直到 copy complete,内核返回结果,用户进程才解除block 的状态。
为了解决阻塞的问题,我们有几个思路。
- 在服务端创建多个线程或者使用线程池,但是在高并发的情况下需要的线程会很多,系统无法承受,而且创建和释放线程都需要消耗资源。
- 由请求方定期轮询,在数据准备完毕后再从内核缓存缓冲区复制数据到用户空间(非阻塞式 I/O),这种方式会存在一定的延迟。
能不能用一个线程处理多个客户端请求?
NIO非阻塞IO:使用多线程的方式来解决这个问题,仍然有一个缺点,线程的数量取决于硬件配置,所以线程数量是有 限的,如果请求量比较大的时候,线程本身会收到限制从而并发量也不会太高。那怎么办呢,我们可以 采用非阻塞IO。 NIO 从JDK1.4 提出的,本意是New IO,它的出现为了弥补原本IO的不足,提供了更高效的方式,提出 一个通道(channel)的概念,在IO中它始终以流的形式对数据的传输和接受,下面我们演示一下NIO 的使用。 所谓的NIO(非阻塞IO),其实就是取消了IO阻塞和连接阻塞,当服务端不存在阻塞的时候,就可以不 断轮询处理客户端的请求,如图所示,表示NIO下的运行流程。
上述这种NIO的使用方式,仍然存在一个问题,就是客户端或者服务端需要通过一个线程不断轮询才能 获得结果,而这个轮询过程中会浪费线程资源。public class NIOServerSocket { public static void main(String[] args) { try { ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); //设置连接非阻塞 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); while(true){ //是非阻塞的 SocketChannel socketChannel=serverSocketChannel.accept(); //获得一个客户端连接 // socketChannel.configureBlocking(false);//IO非阻塞 if(socketChannel!=null){ ByteBuffer byteBuffer=ByteBuffer.allocate(1024); int i=socketChannel.read(byteBuffer); Thread.sleep(10000); byteBuffer.flip(); //反转 socketChannel.write(byteBuffer); }else{ Thread.sleep(1000); System.out.println("连接位就绪"); } } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }
大家站在全局的角度再思考一下整个过程,有哪些地方可以优化呢?
NIO多路复用机制:- I/O 指的是网络 I/O。
- 多路指的是多个 TCP 连接(Socket 或 Channel)。
- 复用指的是复用一个或多个线程。
它的基本原理就是不再由应用程序自己监视连接,而是由内核替应用程序监视文件描述符。客户端在操作的时候,会产生具有不同事件类型的 socket。在服务端,I/O 多路复用程序(I/O Multiplexing Module)会把消息放入队列中,然后通过文件事件分派器(Fileevent Dispatcher),转发到不同的事件处理器中。
多路复用有很多的实现,以 select 为例,当用户进程调用了多路复用器,进程会被阻塞。内核会监视多路复用器负责的所有 socket,当任何一个 socket 的数据准备好了,多路复用器就会返回。这时候用户进程再调用 read 操作,把数据从内核缓冲区拷贝到用户空间。
所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪(readable)状态,select()函数就可以返回。Redis 的多路复用, 提供了 select, epoll, evport, kqueue 几种选择,在编译的时候来选择一种。
- evport 是 Solaris 系统内核提供支持的;
- epoll 是 LINUX 系统内核提供支持的;
- kqueue 是 Mac 系统提供支持的;
- select 是 POSIX 提供的,一般的操作系统都有支撑(保底方案);
我们看到 NIOClientSocket中下面这段代码,当客户端通过 read 方法去读取服务端返回的数据时,如果 此时服务端数据未准备好,对于客户端来说就是一次无效的轮询。
while(true) { int i = socketChannel.read(byteBuffer); if (i > 0) { System.out.println("收到服务端的数据:" + new String(byteBuffer.array())); } else { System.out.println("服务端数据未准备好"); Thread.sleep(1000); } }
我们能不能够设计成,当客户端调用 read 方法之后,不仅仅不阻塞,同时也不需要轮询。而是等到服 务端的数据就绪之后, 告诉客户端。然后客户端再去读取服务端返回的数据呢?就像点外卖一样,我们在网上下单之后,继续做其他事情,等到外卖到了公司,外卖小哥主动打 电话告诉你,你直接去前台取餐即可。
所以为了优化这个问题,引入了多路复用机制。
I/O多路复用的本质是通过一种机制(系统内核缓冲I/O数据),让单个进程可以监视多个文件描述符, 一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作
什么是fd:在linux中,内核把所有的外部设备都当成是一个文件来操作,对一个文件的读写会调 用内核提供的系统命令,返回一个fd(文件描述符)。而对于一个socket的读写也会有相应的文件描 述符,成为socketfd。
常见的IO多路复用方式有【select、poll、epoll】,都是Linux API提供的IO复用方式,那么接下来重 点讲一下select、和epoll这两个模型
-
select:进程可以通过把一个或者多个fd传递给select系统调用,进程会阻塞在select操作上,这 样select可以帮我们检测多个fd是否处于就绪状态,这个模式有两个缺点 由于他能够同时监听多个文件描述符,假如说有1000个,这个时候如果其中一个fd 处于就绪 状态了,那么当前进程需要线性轮询所有的fd,也就是监听的fd越多,性能开销越大。 同时,select在单个进程中能打开的fd是有限制的,默认是1024,对于那些需要支持单机上 万的TCP连接来说确实有点少
-
epoll:linux还提供了epoll的系统调用,epoll是基于事件驱动方式来代替顺序扫描,因此性能相 对来说更高,主要原理是,当被监听的fd中,有fd就绪时,会告知当前进程具体哪一个fd就绪,那 么当前进程只需要去从指定的fd上读取数据即可,另外,epoll所能支持的fd上线是操作系统的最 大文件句柄,这个数字要远远大于1024
由于epoll能够通过事件告知应用进程哪个fd是可读的,所以我们也称这种IO为异步非阻塞IO, 当然它是伪异步的,因为它还需要去把数据从内核同步复制到用户空间中,真正的异步非阻塞, 应该是数据已经完全准备好了,我只需要从用户空间读就行.
同步和异步,指的是用户线程和内核的交互方式,阻塞和非阻塞,指用户线程调用内核IO操作的方式是阻塞还是非阻塞就像在Java中使用多线程做异步处理的概念,通过多线程去执行一个流程,主线程可以不用等待。而阻塞和非阻塞我们可以理解为假如在同步流程或者异步流程中做IO操作,如果缓冲区数据还没准备好,IO的这个过程会阻塞。I/O多路复用的好处是可以通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程 的情况下可以同时处理多个客户端请求。它的最大优势是系统开销小,并且不需要创建新的进程或者线 程,降低了系统的资源开销,它的整体实现思想如下图所示。
客户端请求到服务端后,此时客户端在传输数据过程中,为了避免Server端在read客户端数据过程中阻 塞,服务端会把该请求注册到Selector复路器上,服务端此时不需要等待,只需要启动一个线程,通过 selector.select()阻塞轮询复路器上就绪的channel即可。
也就是说,如果某个客户端连接数据传输完 成,那么select()方法会返回就绪的channel,然后执行相关的处理即可。
代码如下:
public class NIOSelectorServerSocket implements Runnable{ Selector selector; ServerSocketChannel serverSocketChannel; public NIOSelectorServerSocket(int port) throws IOException { selector=Selector.open(); serverSocketChannel=ServerSocketChannel.open(); //如果采用selector模型,必须要设置非阻塞 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { while(!Thread.interrupted()){ try { selector.select(); //阻塞等待事件就绪 Set selected=selector.selectedKeys(); //事件列表 Iterator it=selected.iterator(); while(it.hasNext()){ //说明有连接进来 dispatch((SelectionKey) it.next()); it.remove();//移除当前就绪的事件 } } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey key) throws IOException { if(key.isAcceptable()){ //是连接事件? register(key); }else if(key.isReadable()){ //读事件 read(key); }else if(key.isWritable()){ //写事件 //TODO } } private void register(SelectionKey key) throws IOException { ServerSocketChannel channel= (ServerSocketChannel) key.channel(); //客户端连接 SocketChannel socketChannel=channel.accept(); //获得客户端连接 socketChannel.configureBlocking(false); socketChannel.register(selector,SelectionKey.OP_READ); } private void read(SelectionKey key) throws IOException { //得到的是socketChannel SocketChannel channel= (SocketChannel) key.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); channel.read(byteBuffer); System.out.println("Server Receive Msg:"+new String(byteBuffer.array())); } public static void main(String[] args) throws IOException { NIOSelectorServerSocket selectorServerSocket=new NIOSelectorServerSocket(8080); new Thread(selectorServerSocket).start(); } }
事实上NIO已经解决了上述BIO暴露的下面两个问题:
-
同步阻塞IO,读写阻塞,线程等待时间过长。
-
在制定线程策略的时候,只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制 定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。
到这里为止,通过NIO的多路复用机制,解决了IO阻塞导致客户端连接处理受限的问题,服务端只需要 一个线程就可以维护多个客户端,并且客户端的某个连接如果准备就绪时,会通过事件机制告诉应用程 序某个channel可用,应用程序通过select方法选出就绪的channel进行处理。
单线程Reactor 模型(高性能I/O设计模式):了解了NIO多路复用后,就有必要再和大家说一下Reactor多路复用高性能I/O设计模式,Reactor本质 上就是基于NIO多路复用机制提出的一个高性能IO设计模式。
它的核心思想是把响应IO事件和业务处理 进行分离,通过一个或者多个线程来处理IO事件,然后将就绪得到事件分发到业务处理handlers线程去步非阻塞处理,如下图所示。 Reactor模型有三个重要的组件:
- Reactor :将I/O事件发派给对应的Handler
- Acceptor :处理客户端连接请求
- Handlers :执行非阻塞读/写
代码实现如下:
Reactor:
public class Reactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(selector, serverSocketChannel)); } @Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { dispatch(iterator.next()); iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey key) { //可能拿到的对象有两个 // Acceptor // Handler Runnable runnable = (Runnable) key.attachment(); if (runnable != null) { runnable.run(); // } } }
Acceptor:
public class Acceptor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) { this.selector = selector; this.serverSocketChannel = serverSocketChannel; } @Override public void run() { SocketChannel channel; try { channel = serverSocketChannel.accept();//得到一个客户端连接 System.out.println(channel.getRemoteAddress() + ":收到一个客户端连接"); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ, new Handler(channel)); } catch (IOException e) { e.printStackTrace(); } } }
Handler :
public class Handler implements Runnable { SocketChannel channel; public Handler(SocketChannel channe) { this.channel = channe; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "------"); ByteBuffer buffer = ByteBuffer.allocate(1024); int len = 0, total = 0; String msg = ""; try { do { len = channel.read(buffer); if (len > 0) { total += len; msg += new String(buffer.array()); } } while (len > buffer.capacity()); System.out.println("total:" + total); //msg=表示通信传输报文 //耗时2s //登录: username:password //ServetRequets: 请求信息 //数据库的判断 //返回数据,通过channel写回到客户端 System.out.println(channel.getRemoteAddress() + ": Server receive Msg:" + msg); } catch (Exception e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
以上代码码是最基本的单Reactor单线程模型(整体的I/O操作是由同一个线程完成的)。
其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处 理,有IO读写事件之后交给hanlder 处理。
Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的 hanlder上,对应的SocketChannel有读写事件之后,基于racotor 分发,hanlder就可以处理了(所有的 IO事件都绑定到selector上,有Reactor分发)。
Reactor 模式本质上指的是使用 I/O 多路复用(I/O multiplexing) + 非阻塞 I/O(nonblocking I/O) 的模式。
多线程单Reactor模型:单线程Reactor这种实现方式有存在着缺点,从实例代码中可以看出,handler的执行是串行的,如果其 中一个handler处理线程阻塞将导致其他的业务处理阻塞。由于handler和reactor在同一个线程中的执 行,这也将导致新的无法接收新的请求,我们做一个小实验:
- 在上述Reactor代码的DispatchHandler的run方法中,增加一个Thread.sleep()。
- 打开多个客户端窗口连接(可以通过telnet连接)到Reactor Server端,其中一个窗口发送一个信息后被阻塞,另外一个窗 口再发信息时由于前面的请求阻塞导致后续请求无法被处理。
Redis6.0中多线程带来的性能提升。Redis中的特殊的多线程单Reactor模型。下图是美团技术团队使用阿里云服务器压测GET/SET命令在4个线程IO时性能上的对比结果,可以明显 的看到,Redis 在使用多线程模式之后性能大幅提升,达到了一倍。
Redis Server 阿里云 Ubuntu 18.04 , 8CPU 2.5GHZ,8G内存,主机型号: ecs.ic5.2xlarge Redis Benchmark client: 阿里云 Unbuntu 18.04 , 8CPU 2.5GHZ,8G内存,主机型号: ecs.ic5.2xlarge
为了解决这种问题,有人提出使用多线程的方式来处理业务,也就是在业务处理的地方加入线程池异步 处理,将reactor和handler在不同的线程来执行,如下图所示。
改造代码代码如下:
public class MutilDispatchHandler implements Runnable { SocketChannel channel; private Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public MutilDispatchHandler(SocketChannel channel) { this.channel = channel; } @Override public void run() { processor(); } private void processor() { executor.execute(new ReaderHandler(channel)); } static class ReaderHandler implements Runnable { private SocketChannel channel; public ReaderHandler(SocketChannel channel) { this.channel = channel; } @Override public void run() { System.out.println(Thread.currentThread().getName() + ":-----"); ByteBuffer buffer = ByteBuffer.allocate(1024); try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } int len = 0, total = 0; String msg = ""; try { do { len = channel.read(buffer); if (len > 0) { total += len; msg += new String(buffer.array()); } } while (len > buffer.capacity()); System.out.println("total:" + total); //msg=表示通信传输报文 //耗时2s //登录: username:password //ServetRequets: 请求信息 //数据库的判断 //返回数据,通过channel写回到客户端 System.out.println(channel.getRemoteAddress() + ": Server receive Msg:" + msg); } catch (Exception e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
在多线程Reactor模型中,添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作 者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面 I/O请求的处理。
主从多线程多Reactor模式:
mainReactor负责监听连接,accept连接给subReactor处理,为什么要单独分一个Reactor来处理监听呢?因为像TCP这样需要经过3次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个Reactor来处理,可以提高性能。
代码实现如下:
Acceptor
public class Acceptor implements Runnable{ final Selector sel; final ServerSocketChannel serverSocketChannel; //获取当前核心数 private final int POOL_SIZE=Runtime.getRuntime().availableProcessors(); //线程池 private Executor subReactorExecutor= Executors.newFixedThreadPool(POOL_SIZE); //subReactors 个数 private Reactor[] subReactors=new Reactor[POOL_SIZE]; //任务分发轮询分发计数 int handerNext=0; public Acceptor(Selector sel,int port) throws IOException { this.sel=sel;//打开链接 this.serverSocketChannel=ServerSocketChannel.open(); this.serverSocketChannel.socket().bind(new InetSocketAddress(port)); this.serverSocketChannel.configureBlocking(false); this.serverSocketChannel.register(this.sel, SelectionKey.OP_ACCEPT,this); init(); System.out.println("Main Reactor Acceptor: Listening on port:"+port); } private void init() throws IOException { //初始化subReactors for (int i = 0; i < subReactors.length; i++) { subReactors[i]=new Reactor(); subReactorExecutor.execute(subReactors[i]); } } @Override public void run() { //负责处理连接事件和IO事件 try { SocketChannel socketChannel=serverSocketChannel.accept(); //获取连接 if(socketChannel!=null){ socketChannel.write(ByteBuffer.wrap("Multiply Reactor Patterm\r\nreactor> ".getBytes())); System.out.println(Thread.currentThread().getName()+": Main-Reactor-Acceptor:"+socketChannel.getLocalAddress()+"连接"); //轮询分发 Reactor subReactor=subReactors[handerNext]; //异步处理 subReactor.register(new AsyncHandler(socketChannel)); if(++handerNext==subReactors.length){ handerNext=0; } } } catch (IOException e) { e.printStackTrace(); } } }
Reactor:
public class Reactor implements Runnable{ private final Selector selector; //任务队列 private ConcurrentLinkedQueue<AsyncHandler> events=new ConcurrentLinkedQueue<>(); public Reactor() throws IOException { //打开selector this.selector = Selector.open(); } public Selector getSelector() { return selector; } @Override public void run() { while(!Thread.interrupted()){ AsyncHandler handler; try { //阻塞。获取任务队列,为空的时候会阻塞 while((handler=events.poll())!=null){ handler.getChannel().configureBlocking(false); SelectionKey selectionKey=handler.getChannel().register(selector,SelectionKey.OP_READ); selectionKey.attach(handler); handler.setSk(selectionKey); } //获取到了任务队列,此刻进入轮询监听IO事件 selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); Runnable runnable=(Runnable) key.attachment(); //得到AsyncHandler实例 if(runnable!=null){ //调用AsyncHandler 处理任务 runnable.run(); } iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } } public void register(AsyncHandler handler){ events.offer(handler); //有一个事件注册,添加任务队列 selector.wakeup();//唤醒阻塞selector } }
AsyncHandler:
public class AsyncHandler implements Runnable{ private SocketChannel channel; private SelectionKey sk; StringBuilder stringBuilder=new StringBuilder(); ByteBuffer inputBuffer=ByteBuffer.allocate(1024); ByteBuffer outputBuffer=ByteBuffer.allocate(1024); public AsyncHandler(SocketChannel channel) { this.channel = channel; } public SocketChannel getChannel() { return channel; } public SelectionKey getSk() { return sk; } public void setSk(SelectionKey sk) { this.sk = sk; } @Override public void run() { try { if (sk.isReadable()) { read(); } else if (sk.isWritable()) { write(); } }catch (Exception e){ } } private void read() throws IOException { inputBuffer.clear(); int n=channel.read(inputBuffer); if(inputBufferComplete(n)){ System.out.println(Thread.currentThread().getName()+": Server端收到客户端的请求消息:"+stringBuilder.toString()); outputBuffer.put(stringBuilder.toString().getBytes(StandardCharsets.UTF_8)); this.sk.interestOps(SelectionKey.OP_WRITE); } } private boolean inputBufferComplete(int bytes) throws EOFException { if(bytes>0){ inputBuffer.flip(); while(inputBuffer.hasRemaining()){ byte ch=inputBuffer.get(); //得到输入的字符 if(ch==3) { //表示Ctrl+c throw new EOFException(); }else if(ch=='\r'||ch=='\n'){ return true; }else { stringBuilder.append((char)ch); } } }else if(bytes==1){ throw new EOFException(); } return false; } private void write() throws IOException { int write=-1; outputBuffer.flip(); if(outputBuffer.hasRemaining()){ write=channel.write(outputBuffer); //把收到的数据写回到客户端 } outputBuffer.clear(); stringBuilder.delete(0,stringBuilder.length()); if(write<=0){ this.sk.channel().close(); }else{ channel.write(ByteBuffer.wrap("\r\nreactor> ".getBytes())); this.sk.interestOps(SelectionKey.OP_READ);//又转化为读事件 } } }
MultiplyReactor:
public class MultiplyReactor { private int port; private Reactor mainReactor; //main Reactor Executor mainReactorExecutor= Executors.newFixedThreadPool(10); public MultiplyReactor(int port) throws IOException { this.port = port; mainReactor=new Reactor(); } public void start() throws IOException { new Acceptor(mainReactor.getSelector(),port); mainReactorExecutor.execute(mainReactor); } public static void main(String[] args) throws IOException { new MultiplyReactor(8080).start(); } }
父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持。
源码地址:https://gitee.com/TongHuaShuShuoWoDeJieJu/redis.git
这短短的一生我们最终都会失去,不妨大胆一点,爱一个人,攀一座山,追一个梦