前言
之前总结过NIO的三个组件其实对于Netty来说如果要理解后续的编解码器已经实际的Netty应用场景还有几个Netty的组件需要了解。这篇博客依旧参考《Netty、Redis、Zookeeper高并发实战》一书进行总结。
channel
Netty中channel是核心概念表示一个网络连接通道类似于网络的socketNetty关于channel有一个顶层冲虚向类——AbstractChannel其中有一个构造函数如下
/*** Creates a new instance.** param parent* the parent of this channel. {code null} if theres no parent.*/protected AbstractChannel(Channel parent) {this.parent parent;//父通道id newId();unsafe newUnsafe();pipeline newChannelPipeline();//每一个通道其实都维护一个类似于流水线的东西这个后面会总结}
这里有两个属性需要说明一下一个是parent另一个是pipeline
parent表示通道的父通道对于连接建立的通道比如NioServerScoketChannel来说其父通道为null。而对于每一条传输数据的通道比如NioSocketChannel其父通道就是建立连接的通道。
如果简单一点理解channel就是连接网络的通道。
EmbeddedChannel
这里说一下EmbeddedChannel我们在开发Netty的时候大部分时候都是在开发Inbound入站处理器开发完成之后如果需要测试相关逻辑我们需要将处理器加入到通道流水线中然后启动服务端和客户端。每次修改一个业务都要启动服务端和客户端这个过程非常繁琐并且浪费时间。于是Netty就给我们提供了一个专用的通道——EmbeddedChannel。
Handler
在经典的反应堆模式中我们知道每一个注册在Selector上的网络事件都会被分发给指定的handler去处理在Netty中也是一样Netty中的Handler就是处理不同的网络事件。针对入站和出站有着不同的处理器。
入站处理——数据流动方向为Netty的内部channel到ChannelInboundHandler。
出站处理——数据流动方向为ChannelOutboundHandler到Netty的channel。
但是实际过程中Netty给我们提供了很多默认的handler实现一般不需要我们直接去继承ChannelHandler毕竟ChannelHandler是一个接口如果我们每次新增一个Handler都需要实现ChannelHandler中的每个方法这样很麻烦于是Netty就提供了一个适配器提供了ChannelHandler的默认实现更多的时候我们可以直接继承ChannelHandlerAdapter即可。具体的类结构如下所示
ChannelInitializer通道初始化处理器
其实每一条channel都有一条Handler业务处理流水线这就是我们后续要总结的pipeline 但是负责装配自己的Handler处理器这个工作是发生在通道开始工作之前。通道的初始化类就可以向流水线中装配业务处理类。
serverBootstrap.childHandler(new ChannelInitializer() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(requestLength));socketChannel.pipeline().addLast(new FixLengthTimeServerHandler());}});
上述的代码中就在SocketChannel初始化的时候增加了两个处理器。
pipeline
Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式来设计的。针对每一个channel都会有一个业务流水线与其绑定这个流水线中包含了所有的入站和出站处理器这个业务流水线就是pipelinepipelinechannelChannelHandler三者的关系如下所示。
每个处理器被加入到pipeline的时候都会有一个ChannelHandlerContext与之绑定。
总体来说Channel、Handler、pipeline、ChannelHandlerContext几者的关系为Channel通道拥有一个ChannelPipeline流水线每个流水线节点为一个ChannelHandlerContext通道处理器上下对象每一个上下文中包含了一个ChannelHandler处理器。在ChannelHandler通道处理器的入站/出站处理方法中Netty会传递ChannelHandlerContext通过ChannelHandlerContext可以获取pipeline的实例或者Channel的实例。
流水线的传递与截断
Inbound的事件处理顺序
在每次初始化增加channel处理器的时候这些处理器会自动成为pipeline中的节点这里直接上实例代码
/*** autor:liman* createtime:2020/9/19* comment:simpleInHandlerA*/Slf4jpublic class SimpleInHandlerA extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("this is A simpleInHandler read methodmessage:{}",msg);super.channelRead(ctx, msg);}}/*** autor:liman* createtime:2020/9/19* comment:simpleInHandlerB*/Slf4jpublic class SimpleInHandlerB extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("this is B simpleInHandler read method message:{}",msg);super.channelRead(ctx, msg);}}/*** autor:liman* createtime:2020/9/19* comment:simpleInHandlerC*/Slf4jpublic class SimpleInHandlerC extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("this is C simpleInHandler read method message:{}",msg);super.channelRead(ctx, msg);}}
使用EmbeddedChannel进行测试
/*** autor:liman* createtime:2020/9/19* comment: InboundHandler的处理器测试*/Slf4jpublic class TestPipelineInbound {public static void main(String[] args) {ChannelInitializer channelInitializer new ChannelInitializer() {Overrideprotected void initChannel(EmbeddedChannel ch) throws Exception {ch.pipeline().addLast(new SimpleInHandlerA());ch.pipeline().addLast(new SimpleInHandlerB());ch.pipeline().addLast(new SimpleInHandlerC());}};EmbeddedChannel embeddedChannel new EmbeddedChannel(channelInitializer);ByteBuf buf Unpooled.buffer();buf.writeInt(10);embeddedChannel.writeInbound(10);//向通道中写一个报文}}
运行结果
可以看到调用顺序为A->B->C
Outbound的事件处理顺序
3个OutboundHandler的业务代码
/*** autor:liman* createtime:2020/9/19* comment:SimpleOutHandlerA*/Slf4jpublic class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("出站处理器Athis is simple out handler Awrite method{}",msg);super.write(ctx, msg, promise);}}/*** autor:liman* createtime:2020/9/19* comment:*/Slf4jpublic class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("出站处理器Bthis is simple out handler Bwrite method{}",msg);super.write(ctx, msg, promise);}}/*** autor:liman* createtime:2020/9/19* comment:*/Slf4jpublic class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.info("出站处理器Cthis is simple out handler Cwrite method{}",msg);super.write(ctx, msg, promise);}}
测试代码
/*** autor:liman* createtime:2020/9/19* comment:*/Slf4jpublic class TestPipelineOutBound {public static void main(String[] args) {ChannelInitializer channelInitializer new ChannelInitializer() {Overrideprotected void initChannel(EmbeddedChannel ch) throws Exception {ch.pipeline().addLast(new SimpleOutHandlerA());ch.pipeline().addLast(new SimpleOutHandlerB());ch.pipeline().addLast(new SimpleOutHandlerC());}};EmbeddedChannel embeddedChannel new EmbeddedChannel(channelInitializer);ByteBuf buf Unpooled.buffer();buf.writeInt(10);embeddedChannel.writeOutbound(10);//向通道中写一个报文}}
运行结果
可以看到调用顺序是C->B->A。
如果需要截断流水线的处理只需要在Handler中去掉调用super的read/write方法。具体实例可以直接注释相关代码然后测试即可。
总结
简单总结了ChannelChannelHandlerChannelHandlerContextPipeline的关系。同时需要注意的是channelHandler出站和入站处理器的调用方向是不同的。
【本文来源:香港将军澳机房 http://www.558idc.com/hk.html 欢迎留下您的宝贵建议】