当前位置 : 主页 > 大数据 > 区块链 >

RPC远程协议之Thrift非阻塞多线程处理

来源:互联网 收集:自由互联 发布时间:2021-06-22
RPC远程协议之Thrift非阻塞多线程处理 在上一篇文章《RPC远程协议之Thrift入门》中,我已经介绍了Thrift的基本使用,并举例验证了如何使用,当然这个例子是阻塞单线程的模式,实际客户

RPC远程协议之Thrift非阻塞多线程处理

 

在上一篇文章《RPC远程协议之Thrift入门》中,我已经介绍了Thrift的基本使用,并举例验证了如何使用,当然这个例子是阻塞单线程的模式,实际客户端请求增多,单线程处理时间变长时,无可厚非就会卡顿延迟,所以需要实现非阻塞多线程的通信模式,而Thrift已经想到,并且为我们提供好了几种Server的实现方式,以供在实际业务需求下做出合适的选择。本篇文章主要介绍Java版本的几种Server实现的异同,并对上篇文章的例子做出改进,使其支持非阻塞并且是多线程的。

 

 

l  模块化结构

l  几种Server版本

l  例子验证

 

 

一、模块化结构

引用Thrift官网的结构栈如下:

 

如上图所示,Thrift含有四个主要的组件:Server、Processor、Protocol,以及Transport。其中,protocol定义了消息是按照什么格式序列化的,transport则定义了消息是怎样在客户端和服务端传递的,processor则是由编译工具自动生成的业务处理器,而server则负责从transport接收序列化消息,并根据protocol反序列化它,调用用户自定义的消息处理器,并序列化消息处理器的响应,最后再将其写会transport返回给客户端。

因为Thrift的模块化结构,使得它可以提供多种server实现方式,来满足不同业务需求,比如:

1.  TSimpleServer

2.  TNonblockingServer

3.  THsHaServer

4.  TThreadedSelectorServer

5.  TThreadPoolServer

 

这几种不同的Server版本的异同会在下面介绍,请往下俯瞰。

 

 

二、几种Server版本

1、TSimpleServer

接收一个客户端的连接,并处理连接请求,直到客户端关闭连接,它才会接收新的请求连接。所以,它只在一个单独的线程中以阻塞方式完成工作,所以它只能同时服务于一个客户端连接,其它所有客户端在被服务端接受前都在排队等待。我们上一篇文章的服务端实现,就是采用这种方式来演示的。

 

PS:其主要用于测试使用,一般不在生产环境使用。

 

2、TNonblockingServer

顾名思义,TNonblockingServer为非阻塞的实现方式,解决了TSimpleServer同时只能提供一个连接而阻塞其它所有客户端连接的问题。实际上,其使用了java.nio.channels.Selector,通过调用select(),可以使连接阻塞在多个连接点上,而不是阻塞在单一连接上。当一个或多个连接准备好接收/读写时,select()便会返回。

TNonblockingServer在处理这些连接时,要么接受它,要么从他们那读取数据,要么写入数据到它们,然后再调用select()来等待下一个可用的连接。所以,此种方式的服务端可以同时服务多个客户端,而不会出现一个客户端把其它客户端阻塞的情况。

 

3、THsHaServer

TNonblockingServer解决了同时可以为多个客户端提供服务,但是也存在问题是:所有消息被调用了select()的同一个线程处理,如果有10个客户端,处理每条消息需要100毫秒,那么延迟和吞吐量会怎样?当一个消息被处理,其它9个客户端就等着被select,所以客户端需等待1秒才能从服务端得到响应,吞吐里就是10次/秒,那要是同时可以处理多条消息,是不是100次/秒,这可好?

 

THsHaServer解决了上面的问题:它使用单一线程处理网络IO,一个独立的worker线程池来处理消息,只要有空闲的worker线程,消息就会立即被处理,因此多条消息可被并行处理。

 

4、TThreadedSelectorServer

与THsHaServer主要不同的地方是:TThreadedSelectorServer允许用多个线程来处理网络IO,它维护了两个线程池子,一个用来处理网络IO,一个用来处理请求消息。所以,当网络IO出现瓶颈时,其要比THsHaServer表现更好。而本篇的例子也是采用此种方式来实现服务端,具体查看第三部分。

 

5、TThreadPoolServer

与其它版本Server不同的是:提供一个专门线程来接受连接,一旦接受到连接,就将其放入到ThreadPoolExecutor中的一个worker线程里处理。而这个worker线程被绑定到特定的客户端连接上,直到客户端关闭,这个worker线程才会被放回线程池中,供其他客户端使用。另外,我们可以灵活配置线程池的大小,当然,如果客户端数量超过了线程池中的最大线程数,在有一个worker线程可用之前,请求将被一直阻塞在那里。

 

如果你提前知道了将要连接到你服务器上的客户端数量,并且你不介意运行大量线程的话,TThreadPoolServer对你可能是个很好的选择。

 

我认为TThreadedSelectorServer对大多数案例来说都是个安全之选。如果你的系统资源允许运行大量并发线程的话,你可能会考虑使用TThreadPoolServer。

 

 

三、例子验证

本篇文章继续以上一篇文章中的例子介绍下,在Thrift框架中,如何实现非阻塞多线程的C/S通信方式。当然,这里我们只对UserServer和UserClient进行升级重构,具体代码的意思,请查看代码中的注释说明,不再赘述,具体如下:

 

1、服务端(UserServer.java)

/**
 *
将业务处理逻辑UserHandler作为具体的业务
 *
处理器,传递给Thrift服务器,执行处理逻辑.
 *
服务端实现方式采用:
 * TThreadedSelectorServer
 */
public class UserServer {
   
private static final int port = 9081;

    private void
invoke() {
       
try {
           
// 非阻塞式的Transport,配合TFrameTransport使用
           
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
           
// 关联处理器ProcessorService服务的实现
           
TProcessor processor = new UserService.Processor<UserService.Iface>(new UserHandler());

           
// 采用TThreadedSelectorServer.Args配置可并发处理客户端请求
           
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport);
           
// 指定业务处理器processor
           
args.processor(processor);
           
// 设定协议工厂,指定高效的、密集的二进制编码格式传输
           
args.protocolFactory(new TCompactProtocol.Factory());
           
// 设定传输工厂,指定非阻塞、按块的大小传输,类似NIO
           
args.transportFactory(new TFramedTransport.Factory());
           
// 设定处理器工厂,指定只返回一个单例实例
           
args.processorFactory(new TProcessorFactory(processor));

           
// 指定多个线程,负责客户端的网络IO处理
           
args.selectorThreads(2);
           
// 指定worker线程池的大小
           
ExecutorService pool = Executors.newFixedThreadPool(3);
           
args.executorService(pool);

           
// 采用TThreadedSelectorServer方式实现服务端
           
TThreadedSelectorServer server = new TThreadedSelectorServer(args);
           
System.out.println("Starting server on port:" + port + " ...");
           
server.serve();
       
} catch(TTransportException e) {
            e.printStackTrace()
;
       
}
    }

   
public static void main(String[]args) {
        UserServer us =
new UserServer();
       
us.invoke();
   
}

}

 

 

 

 

2、客户端(UserClient.java)

/**
 *
远程调用服务接口,获取用户信息及订单列表
 *
这里采用非阻塞的传输方式
 */
public class UserClient {
   
private static final int port = 9081;
    private static final
String addr = "localhost";
    private static final int
timeout = 100 * 1000;

    public void
start() {
       
// 非阻塞方式,按块的大小进行传输
       
TTransport transport = new TFramedTransport(new TSocket(addr,port,timeout));
       
// 采用高效率的、密集的二进制格式进行数据传输协议
       
TProtocol protocol = new TCompactProtocol(transport);

       
UserService.Client client = new UserService.Client(protocol);
        try
{
            open(transport)
;
           
User user = client.getUserOrders(10000021);
           
System.out.println(user);   // 打印返回结果
           
close(transport);
       
} catch (TException e) {
            e.printStackTrace()
;
       
}
    }

   
private void open(TTransport transport) {
       
if(null != transport && !transport.isOpen()) {
           
try {
                transport.open()
;
           
} catch(TTransportException e) {
                e.printStackTrace()
;
           
}
        }
    }

   
private void close(TTransport transport) {
       
if(null != transport && transport.isOpen()) {
            transport.close()
;
       
}
    }

   
public static void main(String[] args) {
        UserClient client =
new UserClient();
       
client.start();
   
}

}

 

 

 

 

 

好了,就介绍到这里,至于运行的结果,与上篇的相同,这里就不列出来了。

网友评论