Netty5 服务端和客户端更多干货
分布式实战干货
spring cloud 实战干货
mybatis 实战干货
spring boot 实战干货
React 入门实战干货
构建中小型互联网企业架构干货
python 学习持续更新
ElasticSearch 笔记
kafka storm 实战 (干货)
scala 学习持续更新
RPC
概述
分布式实战干货
spring cloud 实战干货
mybatis 实战干货
spring boot 实战干货
React 入门实战干货
构建中小型互联网企业架构干货
python 学习持续更新
ElasticSearch 笔记
kafka storm 实战 (干货)
scala 学习持续更新
RPC
netty 5 已经放弃掉了,作为学习netty4和5的差别不大,本例子是基于netty5
https://github.com/netty/netty/issues/4466
线程安全
一个thread 队列 一个单线程线程池。线程安全的任务是线性串行执行的
线程安全不会产生阻塞效应 使用对象组
线程不安全会产生阻塞效应 使用对象池
writeAndFlush 发送消息线程安全
其中 EventExecutor executor 是 NioEventLoop 线程安全
public final class NioEventLoop extends SingleThreadEventLoop
代码
Server
/*** netty5服务端***/public class Server {public static void main(String[] args) {//服务类ServerBootstrap bootstrap new ServerBootstrap();//boss和workerEventLoopGroup boss new NioEventLoopGroup();EventLoopGroup worker new NioEventLoopGroup();try {//设置线程池bootstrap.group(boss, worker);//设置socket工厂、bootstrap.channel(NioServerSocketChannel.class);//设置管道工厂bootstrap.childHandler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ServerHandler());}});//netty3中对应设置如下//bootstrap.setOption("backlog", 1024);//bootstrap.setOption("tcpNoDelay", true);//bootstrap.setOption("keepAlive", true);//设置参数TCP参数bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置链接缓冲池的大小bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃清除死链接bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送//绑定端口ChannelFuture future bootstrap.bind(10101);System.out.println("start");//等待服务端关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally{//释放资源boss.shutdownGracefully();worker.shutdownGracefully();}}}
ServerHandler
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/*** 服务端消息处理***/public class ServerHandler extends SimpleChannelInboundHandler {Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);ctx.channel().writeAndFlush("hi");ctx.writeAndFlush("hi");}/*** 新客户端接入*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelActive");}/*** 客户端断开*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelInactive");}/*** 异常*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}}
Client
import java.io.BufferedReader;import java.io.InputStreamReader;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/*** netty5的客户端***/public class Client {public static void main(String[] args) {//服务类Bootstrap bootstrap new Bootstrap();//workerEventLoopGroup worker new NioEventLoopGroup();try {//设置线程池bootstrap.group(worker);//设置socket工厂、bootstrap.channel(NioSocketChannel.class);//设置管道bootstrap.handler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ClientHandler());}});ChannelFuture connect bootstrap.connect("127.0.0.1", 10101);BufferedReader bufferedReader new BufferedReader(new InputStreamReader(System.in));while(true){System.out.println("请输入");String msg bufferedReader.readLine();connect.channel().writeAndFlush(msg);}} catch (Exception e) {e.printStackTrace();} finally{worker.shutdownGracefully();}}}
ClientHandler
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/*** 客户端消息处理***/public class ClientHandler extends SimpleChannelInboundHandler {Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("客户端收到消息:"msg);}}
多连接客户端
MultClient
其中 synchronized(channel) 可以采用 单任务队列的方式
import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/*** 多连接客户端***/public class MultClient {/*** 服务类*/private Bootstrap bootstrap new Bootstrap();/*** 会话*/private List channels new ArrayList();/*** 引用计数*/private final AtomicInteger index new AtomicInteger();/*** 初始化* param count*/public void init(int count){//workerEventLoopGroup worker new NioEventLoopGroup();//设置线程池bootstrap.group(worker);//设置socket工厂、bootstrap.channel(NioSocketChannel.class);//设置管道bootstrap.handler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ClientHandler());}});for(int i1; i channels.size()){throw new RuntimeException("no can use channel");}return getFirstActiveChannel(count 1);}return channel;}/*** 重连* param channel*/private void reconnect(Channel channel){synchronized(channel){if(channels.indexOf(channel) -1){return ;}Channel newChannel bootstrap.connect("192.168.0.103", 10101).channel();channels.set(channels.indexOf(channel), newChannel);}}}
Start
import java.io.BufferedReader;import java.io.InputStreamReader;/*** 启动类***/public class Start {public static void main(String[] args) {MultClient client new MultClient();client.init(5);BufferedReader bufferedReader new BufferedReader(new InputStreamReader(System.in));while(true){try {System.out.println("请输入:");String msg bufferedReader.readLine();client.nextChannel().writeAndFlush(msg);} catch (Exception e) {e.printStackTrace();}}}}