o文章摘自 netty 官网(netty.io) netty 是一个异步的,事件驱动的网络应用通信框架,可以让我们快速编写可靠,高性能,高可扩展的服务端和客户端 样例一:discard server(丢弃任何消息
o文章摘自 netty 官网(netty.io)
netty 是一个异步的,事件驱动的网络应用通信框架,可以让我们快速编写可靠,高性能,高可扩展的服务端和客户端
-
样例一:discard server(丢弃任何消息的服务端)
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
1.DiscardServerHandler扩展了ChannelInboundHandlerAdapter,它是ChannelInboundHandler的一
个实现。
ChannelInboundHandler提供了可以覆盖的各种事件处理程序方法。 目前,只需扩展
ChannelInboundHandlerAdapter而不是自己实现处理程序接口。
2.我们在这里覆盖channelRead()事件处理程序方法。 每当从客户端接收到新数据时,都会使用收到的
消息调用此方法。 在此示例中,接收消息的类型是ByteBuf。
3.要实现DISCARD协议,处理程序必须忽略收到的消息。
ByteBuf是一个引用计数对象(
ReferenceCounted
),
必须通过
release()方法显式释放。 请记住,处理程序有责任释放传递给处理程序的任何引用计数对象。 通常,
channelRead()处理程序方法的实现方式如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
4.io,或者hadler在处理数据时,可能会导致netty抛出异常,这时会调用exceptionCaught()方法,在大
多数情况下,应该记录捕获的异常并在此处关闭其关联的通道,当然,根据具体情况,也可添加其他逻辑,
比如发送带有错误代码的响应消息。
-
至此我们已经实现了一半discardserver,接下来编写main方法来启动server
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
1.NioEventLoopGroup是一个处理I / O操作的多线程事件循环。 Netty为不同类型的传输提供各种
EventLoopGroup实现。 我们在此示例中实现了服务器端应用程序,因此将使用两个NioEventLoopGroup。
第一个,通常称为“boss”,接受传入连接。 第二个,通常称为“worker”,一旦boss接受连接并将接受
的连接注册到worker,worker就处理被接受连接的io。 NioEventLoopGroup使用多少个线程以及它们如何
映射到创建的Channels取决于EventLoopGroup实现,也可以通过构造函数进行配置。
2.ServerBootstrap是一个设置服务器的辅助类。
3.在这里,我们指定使用NioServerSocketChannel类,该类用于实例化新Channel以接受传入连接。
4.此处指定的处理程序将始终由新接受的Channel评估(不太懂)。 ChannelInitializer是一个特殊的处理程
序,旨在帮助用户配置新的Channel。 可以将不同的handler 添加到pipeline 中来完成对消息的复杂处理。
5.可以通过option()方法来给channel 来指定参数
6.你注意到option()和childOption()吗? option()用于接受传入连接的NioServerSocketChannel。
childOption()用于父ServerChannel接受的Channels,在这种情况下是NioServerSocketChannel。
7.我们现在准备好了。 剩下的就是绑定到端口并启动服务器。 在这里,我们绑定到机器中所有NIC(网络
接口卡)的端口8080。 您现在可以根据需要多次调用bind()方法(使用不同的绑定地址。)
-
之前是把收到的消息直接丢弃,现在我们把收到的消息写出去
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
1.ChannelHandlerContext对象提供各种操作,使您能够触发各种I / O事件和操作。 在这里,我们调用
write(Object)来逐个写出接收到的消息。 请注意,我们没有release收到的消息,这与我们在DISCARD
示例中的操作不同。
这是因为Netty在写入线路时会为您release。
2.
ctx.write(Object)不会将消息写入线路。 而是存在内部缓存,然后通过
ctx.flush()刷新到线路。 或
者可以将上述两步合二为一,
调用ctx.writeAndFlush(msg)即可
它与前面的示例的不同之处在于,它发送包含32位整数的消息,而不接收任何请求,并在
发送消息后关闭连接。
在此示例中,您将学习如何构造和发送消息,以及在完成时关闭连接。
因为我们将忽略任何接收的数据,但是一旦建立连接就发送消息,这次我们不能使用channelRead()方法。
相反,我们应该覆盖channelActive()方法。 以下是实现:
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1.如上所述,当建立连接并准备进行io时,将调用channelActive()方法。 让我们写一个32位整数来表
示这个方法中的当前时间。
2.要发送新消息,我们需要分配一个包含消息的新缓冲区。 我们要写一个32位整数,因此我们需要一个容
量至少为4个字节的ByteBuf。 通过ChannelHandlerContext.alloc()获取当前的ByteBufAllocator并分
配一个新的缓冲区。
3.flip在哪里?在NIO中发送消息之前,我们不习惯调用java.nio.ByteBuffer.flip()吗?
下面时java.nio.ByteBuffer.flip方法的英文注释:
Flips this buffer. The limit is set to the current position and then the position is set
to zero. If the mark is defined then it is discarded.
我的理解是把limit 设置为当前下标位置,然后下标归零,如果定义了mark,则丢弃
netty ByteBuf没有这样的方法,因为它有两个指针;一个用于读操作,另一个用于写操作。当您在读取器索
引未更改时向ByteBuf写入内容时,写入器索引会增加。 reader索引和writer索引分别表示消息的开始和结
束位置。
另一点需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法返回一个
ChannelFuture。 ChannelFuture表示尚
未发生的I / O操作。 这意味着,任何请求的操作可能尚未执行,
因为所有操作在Netty中都是异步的。 例如,以下代码可能会在发送消息之前关闭连接:
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,您需要在ChannelFuture完成之后调用close()方法,并在写入操作完成时通知其侦听器。 请注意,
close()也可能不会立即关闭连接,因为close 也返回ChannelFuture。
4.我们怎么知道写请求什么时候完成,让后来关闭连接呢,可以添加一个ChannelFutureListener,让其监听
channelfuture,当channelfuture完成任务时关闭连接,上面的方法中我们用了一个匿名ChannelFutureListener
更简单的替代方法是
f.addListener(ChannelFutureListener.CLOSE);
-
接下来写一个time client 来接受server发回 的消息
server 和 client没多大不同,只是使用了不同的 channel 和 bootstrap
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
1.Bootstrap与ServerBootstrap类似,不同之处在于它
适用于非服务器通道,例如客户端或无连接通道。
2.只指定一个EventLoopGroup,它将同时用作boss组和worker组。
3.
NioSocketChannel用于创建客户端通道,而不是NioServerSocketChannel。
4.请注意,我们不像在ServerBootstrap中那样使用childOption(),因为客户端S
ocketChannel没有
父服务器。
5.我们应该调用connect()方法而不是bind()方法。
-
ChannelHandler实现怎么样? 它应该从服务器接收一个32位整数,将其转换为人类可读的格式,
打印翻译的时间,并关闭连接:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
1.在TCP / IP中,Netty将从对等方发送的数据读入ByteBuf。
-
上面的handler 有时候回发神经,抛出IndexOutOfBoundsException,原因如下:
在
基于流的传输(例如TCP / IP)中,接收的
数据存储在套接字接收缓冲区中。但是套接字缓冲区中存储的
不是包队列,而是字节队列。 这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将
它们视为两条消息,而只是一堆字节。 因此,无法保证您所阅读的内容正是您的远程peer所写的内容。 例如,
假设操作系统的TCP / IP堆栈已收到三个数据包:
abc def ghi
由于基于流的协议的这种一般属性,应用程序很有可能以下面的碎片形式读取它们
ab cdef g hi
那么应该怎么解决这个问题呢?还记得之前在ChannelInitializer 中添加多个hadler吗?可以在pipeline中添加
一个hadler 来专门解决碎片化问题
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(
4)); // (4)
}
}
1.ByteToMessageDecoder是ChannelInboundHandler的一个实现,可以很容易地处理碎片问题。
2.每当收到新数据时,ByteToMessageDecoder都会使用内部维护的
累积缓冲区调用decode()方法。
3.如果累积缓冲区中没有足够数据,decode()不向out中添加内容。 当收到更多数据时,
ByteToMessageDecoder将再次调用decode()。
4.如果decode()将对象添加到out,则意味着解码器成功解码了一条消息。 ByteToMessageDecoder
将
丢弃累积缓冲区的
读取
部分。ByteToMessageDecoder将继续调用decode()方法,直到它不添加任
何内容。
下面来看看改版的ChannelInitializer
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});