RPC远程协议之Thrift非阻塞多线程处理
在上一篇文章《RPC远程协议之Thrift入门》中,我已经介绍了Thrift的基本使用,并举例验证了如何使用,当然这个例子是阻塞单线程的模式,实际客户端请求增多,单线程处理时间变长时,无可厚非就会卡顿延迟,所以需要实现非阻塞多线程的通信模式,而Thrift已经想到,并且为我们提供好了几种Server的实现方式,以供在实际业务需求下做出合适的选择。本篇文章主要介绍Java版本的几种Server实现的异同,并对上篇文章的例子做出改进,使其支持非阻塞并且是多线程的。
l 模块化结构
l 几种Server版本
l 例子验证
一、模块化结构
引用Thrift官网的结构栈如下:
如上图所示,Thrift含有四个主要的组件:Server、Processor、Protocol,以及Transport。其中,protocol定义了消息是按照什么格式序列化的,transport则定义了消息是怎样在客户端和服务端传递的,processor则是由编译工具自动生成的业务处理器,而server则负责从transport接收序列化消息,并根据protocol反序列化它,调用用户自定义的消息处理器,并序列化消息处理器的响应,最后再将其写会transport返回给客户端。
因为Thrift的模块化结构,使得它可以提供多种server实现方式,来满足不同业务需求,比如:
1. TSimpleServer
2. TNonblockingServer
3. THsHaServer
4. TThreadedSelectorServer
5. TThreadPoolServer
这几种不同的Server版本的异同会在下面介绍,请往下俯瞰。
二、几种Server版本
1、TSimpleServer
接收一个客户端的连接,并处理连接请求,直到客户端关闭连接,它才会接收新的请求连接。所以,它只在一个单独的线程中以阻塞方式完成工作,所以它只能同时服务于一个客户端连接,其它所有客户端在被服务端接受前都在排队等待。我们上一篇文章的服务端实现,就是采用这种方式来演示的。
PS:其主要用于测试使用,一般不在生产环境使用。
2、TNonblockingServer
顾名思义,TNonblockingServer为非阻塞的实现方式,解决了TSimpleServer同时只能提供一个连接而阻塞其它所有客户端连接的问题。实际上,其使用了java.nio.channels.Selector,通过调用select(),可以使连接阻塞在多个连接点上,而不是阻塞在单一连接上。当一个或多个连接准备好接收/读写时,select()便会返回。
TNonblockingServer在处理这些连接时,要么接受它,要么从他们那读取数据,要么写入数据到它们,然后再调用select()来等待下一个可用的连接。所以,此种方式的服务端可以同时服务多个客户端,而不会出现一个客户端把其它客户端阻塞的情况。
3、THsHaServer
TNonblockingServer解决了同时可以为多个客户端提供服务,但是也存在问题是:所有消息被调用了select()的同一个线程处理,如果有10个客户端,处理每条消息需要100毫秒,那么延迟和吞吐量会怎样?当一个消息被处理,其它9个客户端就等着被select,所以客户端需等待1秒才能从服务端得到响应,吞吐里就是10次/秒,那要是同时可以处理多条消息,是不是100次/秒,这可好?
THsHaServer解决了上面的问题:它使用单一线程处理网络IO,一个独立的worker线程池来处理消息,只要有空闲的worker线程,消息就会立即被处理,因此多条消息可被并行处理。
4、TThreadedSelectorServer
与THsHaServer主要不同的地方是:TThreadedSelectorServer允许用多个线程来处理网络IO,它维护了两个线程池子,一个用来处理网络IO,一个用来处理请求消息。所以,当网络IO出现瓶颈时,其要比THsHaServer表现更好。而本篇的例子也是采用此种方式来实现服务端,具体查看第三部分。
5、TThreadPoolServer
与其它版本Server不同的是:提供一个专门线程来接受连接,一旦接受到连接,就将其放入到ThreadPoolExecutor中的一个worker线程里处理。而这个worker线程被绑定到特定的客户端连接上,直到客户端关闭,这个worker线程才会被放回线程池中,供其他客户端使用。另外,我们可以灵活配置线程池的大小,当然,如果客户端数量超过了线程池中的最大线程数,在有一个worker线程可用之前,请求将被一直阻塞在那里。
如果你提前知道了将要连接到你服务器上的客户端数量,并且你不介意运行大量线程的话,TThreadPoolServer对你可能是个很好的选择。
我认为TThreadedSelectorServer对大多数案例来说都是个安全之选。如果你的系统资源允许运行大量并发线程的话,你可能会考虑使用TThreadPoolServer。
三、例子验证
本篇文章继续以上一篇文章中的例子介绍下,在Thrift框架中,如何实现非阻塞多线程的C/S通信方式。当然,这里我们只对UserServer和UserClient进行升级重构,具体代码的意思,请查看代码中的注释说明,不再赘述,具体如下:
1、服务端(UserServer.java)
/**
* 将业务处理逻辑UserHandler作为具体的业务
* 处理器,传递给Thrift服务器,执行处理逻辑.
* 服务端实现方式采用:
* TThreadedSelectorServer
*/
public class UserServer {
private static final int port = 9081;
private void invoke() {
try {
// 非阻塞式的Transport,配合TFrameTransport使用
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
// 关联处理器Processor与Service服务的实现
TProcessor processor = new UserService.Processor<UserService.Iface>(new UserHandler());
// 采用TThreadedSelectorServer.Args配置可并发处理客户端请求
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
// 指定业务处理器processor
args.processor(processor);
// 设定协议工厂,指定高效的、密集的二进制编码格式传输
args.protocolFactory(new TCompactProtocol.Factory());
// 设定传输工厂,指定非阻塞、按块的大小传输,类似NIO
args.transportFactory(new TFramedTransport.Factory());
// 设定处理器工厂,指定只返回一个单例实例
args.processorFactory(new TProcessorFactory(processor));
// 指定多个线程,负责客户端的网络IO处理
args.selectorThreads(2);
// 指定worker线程池的大小
ExecutorService pool = Executors.newFixedThreadPool(3);
args.executorService(pool);
// 采用TThreadedSelectorServer方式实现服务端
TThreadedSelectorServer server = new TThreadedSelectorServer(args);
System.out.println("Starting server on port:" + port + " ...");
server.serve();
} catch(TTransportException e) {
e.printStackTrace();
}
}
public static void main(String[]args) {
UserServer us = new UserServer();
us.invoke();
}
}
2、客户端(UserClient.java)
/**
* 远程调用服务接口,获取用户信息及订单列表
* 这里采用非阻塞的传输方式
*/
public class UserClient {
private static final int port = 9081;
private static final String addr = "localhost";
private static final int timeout = 100 * 1000;
public void start() {
// 非阻塞方式,按块的大小进行传输
TTransport transport = new TFramedTransport(new TSocket(addr,port,timeout));
// 采用高效率的、密集的二进制格式进行数据传输协议
TProtocol protocol = new TCompactProtocol(transport);
UserService.Client client = new UserService.Client(protocol);
try {
open(transport);
User user = client.getUserOrders(10000021);
System.out.println(user); // 打印返回结果
close(transport);
} catch (TException e) {
e.printStackTrace();
}
}
private void open(TTransport transport) {
if(null != transport && !transport.isOpen()) {
try {
transport.open();
} catch(TTransportException e) {
e.printStackTrace();
}
}
}
private void close(TTransport transport) {
if(null != transport && transport.isOpen()) {
transport.close();
}
}
public static void main(String[] args) {
UserClient client = new UserClient();
client.start();
}
}
好了,就介绍到这里,至于运行的结果,与上篇的相同,这里就不列出来了。