Netty5 服务端和客户端
更多干货
分布式实战干货
spring cloud 实战干货
mybatis 实战干货
spring boot 实战干货
React 入门实战干货
构建中小型互联网企业架构干货
python 学习持续更新
ElasticSearch 笔记
kafka storm 实战 (干货)
scala 学习持续更新
RPC
概述
netty 5 已经放弃掉了,作为学习netty4和5的差别不大,本例子是基于netty5
https://github.com/netty/netty/issues/4466
线程安全
一个thread 队列 一个单线程线程池。线程安全的任务是线性串行执行的
线程安全不会产生阻塞效应 使用对象组
线程不安全会产生阻塞效应 使用对象池
writeAndFlush 发送消息线程安全
其中 EventExecutor executor 是 NioEventLoop 线程安全
public final class NioEventLoop extends SingleThreadEventLoop
代码
Server
/*** netty5服务端***/public class Server {public static void main(String[] args) {//服务类ServerBootstrap bootstrap new ServerBootstrap();//boss和workerEventLoopGroup boss new NioEventLoopGroup();EventLoopGroup worker new NioEventLoopGroup();try {//设置线程池bootstrap.group(boss, worker);//设置socket工厂、bootstrap.channel(NioServerSocketChannel.class);//设置管道工厂bootstrap.childHandler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ServerHandler());}});//netty3中对应设置如下//bootstrap.setOption("backlog", 1024);//bootstrap.setOption("tcpNoDelay", true);//bootstrap.setOption("keepAlive", true);//设置参数TCP参数bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置链接缓冲池的大小bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃清除死链接bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送//绑定端口ChannelFuture future bootstrap.bind(10101);System.out.println("start");//等待服务端关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally{//释放资源boss.shutdownGracefully();worker.shutdownGracefully();}}}
ServerHandler
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/*** 服务端消息处理***/public class ServerHandler extends SimpleChannelInboundHandler {Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);ctx.channel().writeAndFlush("hi");ctx.writeAndFlush("hi");}/*** 新客户端接入*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelActive");}/*** 客户端断开*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelInactive");}/*** 异常*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}}
Client
import java.io.BufferedReader;import java.io.InputStreamReader;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/*** netty5的客户端***/public class Client {public static void main(String[] args) {//服务类Bootstrap bootstrap new Bootstrap();//workerEventLoopGroup worker new NioEventLoopGroup();try {//设置线程池bootstrap.group(worker);//设置socket工厂、bootstrap.channel(NioSocketChannel.class);//设置管道bootstrap.handler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ClientHandler());}});ChannelFuture connect bootstrap.connect("127.0.0.1", 10101);BufferedReader bufferedReader new BufferedReader(new InputStreamReader(System.in));while(true){System.out.println("请输入");String msg bufferedReader.readLine();connect.channel().writeAndFlush(msg);}} catch (Exception e) {e.printStackTrace();} finally{worker.shutdownGracefully();}}}
ClientHandler
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/*** 客户端消息处理***/public class ClientHandler extends SimpleChannelInboundHandler {Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("客户端收到消息:"msg);}}
多连接客户端
MultClient
其中 synchronized(channel) 可以采用 单任务队列的方式
import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/*** 多连接客户端***/public class MultClient {/*** 服务类*/private Bootstrap bootstrap new Bootstrap();/*** 会话*/private List channels new ArrayList();/*** 引用计数*/private final AtomicInteger index new AtomicInteger();/*** 初始化* param count*/public void init(int count){//workerEventLoopGroup worker new NioEventLoopGroup();//设置线程池bootstrap.group(worker);//设置socket工厂、bootstrap.channel(NioSocketChannel.class);//设置管道bootstrap.handler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ClientHandler());}});for(int i1; i channels.size()){throw new RuntimeException("no can use channel");}return getFirstActiveChannel(count 1);}return channel;}/*** 重连* param channel*/private void reconnect(Channel channel){synchronized(channel){if(channels.indexOf(channel) -1){return ;}Channel newChannel bootstrap.connect("192.168.0.103", 10101).channel();channels.set(channels.indexOf(channel), newChannel);}}}
Start
import java.io.BufferedReader;import java.io.InputStreamReader;/*** 启动类***/public class Start {public static void main(String[] args) {MultClient client new MultClient();client.init(5);BufferedReader bufferedReader new BufferedReader(new InputStreamReader(System.in));while(true){try {System.out.println("请输入:");String msg bufferedReader.readLine();client.nextChannel().writeAndFlush(msg);} catch (Exception e) {e.printStackTrace();}}}}