当前位置 : 主页 > 网络编程 > 其它编程 >

Netty5服务端和客户端

来源:互联网 收集:自由互联 发布时间:2023-07-02
Netty5服务端和客户端更多干货分布式实战干货springcloud实战干货mybatis实战 Netty5 服务端和客户端 更多干货 分布式实战干货 spring cloud 实战干货 mybatis 实战干货 spring boot 实战干货 React 入门
Netty5服务端和客户端更多干货分布式实战干货springcloud实战干货mybatis实战

Netty5 服务端和客户端

更多干货

  • 分布式实战干货

  • spring cloud 实战干货

  • mybatis 实战干货

  • spring boot 实战干货

  • React 入门实战干货

  • 构建中小型互联网企业架构干货

  • python 学习持续更新

  • ElasticSearch 笔记

  • kafka storm 实战 (干货)

  • scala 学习持续更新

  • RPC

概述

netty 5 已经放弃掉了,作为学习netty4和5的差别不大,本例子是基于netty5

https://github.com/netty/netty/issues/4466

线程安全

  • 一个thread 队列 一个单线程线程池。线程安全的任务是线性串行执行的

  • 线程安全不会产生阻塞效应 使用对象组

  • 线程不安全会产生阻塞效应 使用对象池

Figure 1. 对象池Figure 2. 对象组

writeAndFlush 发送消息线程安全

其中 EventExecutor executor 是 NioEventLoop 线程安全

public final class NioEventLoop extends SingleThreadEventLoop

代码

Server

/*** netty5服务端***/public class Server {public static void main(String[] args) {//服务类ServerBootstrap bootstrap new ServerBootstrap();//boss和workerEventLoopGroup boss new NioEventLoopGroup();EventLoopGroup worker new NioEventLoopGroup();try {//设置线程池bootstrap.group(boss, worker);//设置socket工厂、bootstrap.channel(NioServerSocketChannel.class);//设置管道工厂bootstrap.childHandler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ServerHandler());}});//netty3中对应设置如下//bootstrap.setOption("backlog", 1024);//bootstrap.setOption("tcpNoDelay", true);//bootstrap.setOption("keepAlive", true);//设置参数TCP参数bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置链接缓冲池的大小bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃清除死链接bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送//绑定端口ChannelFuture future bootstrap.bind(10101);System.out.println("start");//等待服务端关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally{//释放资源boss.shutdownGracefully();worker.shutdownGracefully();}}}

ServerHandler

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/*** 服务端消息处理***/public class ServerHandler extends SimpleChannelInboundHandler {Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);ctx.channel().writeAndFlush("hi");ctx.writeAndFlush("hi");}/*** 新客户端接入*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelActive");}/*** 客户端断开*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("channelInactive");}/*** 异常*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();}}

Client

import java.io.BufferedReader;import java.io.InputStreamReader;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/*** netty5的客户端***/public class Client {public static void main(String[] args) {//服务类Bootstrap bootstrap new Bootstrap();//workerEventLoopGroup worker new NioEventLoopGroup();try {//设置线程池bootstrap.group(worker);//设置socket工厂、bootstrap.channel(NioSocketChannel.class);//设置管道bootstrap.handler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ClientHandler());}});ChannelFuture connect bootstrap.connect("127.0.0.1", 10101);BufferedReader bufferedReader new BufferedReader(new InputStreamReader(System.in));while(true){System.out.println("请输入");String msg bufferedReader.readLine();connect.channel().writeAndFlush(msg);}} catch (Exception e) {e.printStackTrace();} finally{worker.shutdownGracefully();}}}

ClientHandler

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/*** 客户端消息处理***/public class ClientHandler extends SimpleChannelInboundHandler {Overrideprotected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("客户端收到消息:"msg);}}

多连接客户端

MultClient

  • 其中 synchronized(channel) 可以采用 单任务队列的方式

import java.util.ArrayList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/*** 多连接客户端***/public class MultClient {/*** 服务类*/private Bootstrap bootstrap new Bootstrap();/*** 会话*/private List channels new ArrayList();/*** 引用计数*/private final AtomicInteger index new AtomicInteger();/*** 初始化* param count*/public void init(int count){//workerEventLoopGroup worker new NioEventLoopGroup();//设置线程池bootstrap.group(worker);//设置socket工厂、bootstrap.channel(NioSocketChannel.class);//设置管道bootstrap.handler(new ChannelInitializer() {Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ClientHandler());}});for(int i1; i channels.size()){throw new RuntimeException("no can use channel");}return getFirstActiveChannel(count 1);}return channel;}/*** 重连* param channel*/private void reconnect(Channel channel){synchronized(channel){if(channels.indexOf(channel) -1){return ;}Channel newChannel bootstrap.connect("192.168.0.103", 10101).channel();channels.set(channels.indexOf(channel), newChannel);}}}

Start

import java.io.BufferedReader;import java.io.InputStreamReader;/*** 启动类***/public class Start {public static void main(String[] args) {MultClient client new MultClient();client.init(5);BufferedReader bufferedReader new BufferedReader(new InputStreamReader(System.in));while(true){try {System.out.println("请输入:");String msg bufferedReader.readLine();client.nextChannel().writeAndFlush(msg);} catch (Exception e) {e.printStackTrace();}}}}

网友评论