当前位置 : 主页 > 编程语言 > 其它开发 >

netty通信

来源:互联网 收集:自由互联 发布时间:2022-05-20
学习netty之前,要先了解操作系统中的IO、零拷贝(已经附上链接了) 一、netty的简单介绍 Netty 是由 JBOSS 提供的一个 Java 开源框架 ,现为 Github 上的独立项目。 Netty 是一个 异步的、基

学习netty之前,要先了解操作系统中的IO、零拷贝(已经附上链接了)

一、netty的简单介绍
  • Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github 上的独立项目。
  • Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
  • Netty 主要针对在 TCP 协议下,面向 Client 端的高并发应用,或者 **Peer-to-Peer **场景下的大量数据持续传输的应用。
  • Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景。
  • Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信
为什么有了netty框架

原生的NIO存在问题:

  • NIO的类库和API繁杂:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer
  • 要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式
  • Selector可能出现空轮询,占用CPU资源

netty是对NIO进行封装,解决了上述问题:

  • 设计优雅:
    适用于各种传输类型的统一API,阻塞和非阻塞Socket;
    基于灵活且可扩展的事件模型,可以清晰地分离关注点;
    高度可定制的线程模型
  • 高性能、吞吐量更高:
    延迟更低;
    减少资源消耗;
    最小化不必要的内存复制。
  • 安全:完整的 SSL/TLS 和 StartTLS 支持。
  • 社区活跃、不断更新
二、线程模式

下面讲解的是netty线程模式的由来

现有的线程模式:

  • BIO的传统阻塞IO服务模型
  • NIO的Reactor模型
    单 Reactor 单线程;
    单 Reactor多线程;
    主从 Reactor多线程
2.1 BIO的传统阻塞IO服务模型

1、每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。当并发数很大,就会创建大量的线程,占用很大系统资源。
2、连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 Handler对象中的read 操作,导致上面的处理线程资源浪费。
image

2.2 NIO的Reactor模型

基于I/O多路复用模型:多个客户端进行连接,先把连接请求给Reactor,多个连接共用一个阻塞对象Reactor,由Reactor负责监听和分发,当客户端连接没有数据时不会阻塞线程。
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。(解决了当并发数很大时,会创建大量线程,占用很大系统资源)

Reactor模式中核心组成:

  • Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理线程来对IO事件做出反应。
  • Handlers:处理线程执行IO事件。
单Reactor单线程

特点:

  • 该模型简单没有多线程的竞争,由一个线程完成所有的操作(监听、分发、执行),没有充分利用多核CPU
  • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间长因为该线程还要处理业务;
  • 当Reactor出现问题,就会造成业务模块不可用

image
上图解析:

  • Reactor对象通过select监控客户端请求事件,收到事件后通过dispatch 进行分发
  • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
  • 如果不是建立连接事件,则Reactor会分发处理线程来处理Handler的IO事件(完成 Read → 业务处理 → send 的完整业务流程)
单Reactor多线程

特点:

  • 充分利用多核CPU,可能出现多线程竞争
  • 因为Reactor是单线程运行,因此在处理某个handler的IO事件时,其他的handler需要进行等待,等待时间短因为处理业务交给worker线程池执行;
  • Reactor承担所有的事件的监听和响应,它是单线程运行,在高并发场景容易出现性能瓶颈
  • 当Reactor出现问题,就会造成业务模块不可用

image

上图解析:

  • Reactor对象通过select监控客户端请求事件,收到事件后,通过Dispatch 进行分发
  • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
  • 如果不是连接请求事件,则Reactor会分发处理线程来处理Handler的IO事件,handler只负责响应事件(read、send),不做具体的业务处理交给worker线程池执行(这样不会使handler阻塞太久)
  • worker线程池会分配独立的worker线程完成真正的业务,并将结果返回给handler,handler收到响应后,通过send将结果返回给client
主从Reactor多线程

特点:

  • 主Reactor可已有多个,从Reactor也可以有多个
  • 主Reactor负责处理建立连接请求事件,从Reactor负责处理业务请求事件
  • 当从Reactor出现问题,可以调用其他的从Reactor提供服务

image

上图解析:

  • Reactor主线程 MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor的accept处理连接事件
  • 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor,subreactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  • 当有新事件发生时,subreactor就会分发处理线程来处理handler,handler通过read读取数据,分发给后面的worker线程池处理。
  • worker线程池分配独立的worker线程进行业务处理,并返回结果。handler 收到响应的结果后,再通过send将结果返回给client
  • Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个 SubReactor

生活中的体现:
单 Reactor 单线程:前台接待员和服务员是同一个人,全程为顾客服务
单 Reactor 多线程:1 个前台接待员,多个服务员,接待员只负责接待
主从 Reactor 多线程:多个前台接待员,多个服务生

2.3 netty线程模型

netty线程模型主要是依据主从Reactor多线程
image

  • BossGroup中的NioEventLoop就像MainReactor可以有多个,WorkerGroup中的NioEventLoop就像SubReactor一样可以有多个。

  • Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

  • BossGroup和WorkerGroup类型都是NioEventLoopGroup,NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop(NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop)

  • NioEventLoop表示一个不断循环的执行处理任务的线程,每个 NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯

  • 每个BossGroup下面的NioEventLoop循环执行的步骤有3步:
    1、轮询 accept 事件
    2、处理 accept 事件,与 client 建立连接,生成NioSocketChannel,并将其注册到某个WorkerGroup的NioEventLoop上的Selector
    3、继续处理任务队列的任务,即runAllTasks

  • 每个WorkerGroup下面的NioEventLoop循环执行的步骤有3步:
    1、轮询 read,write 事件
    2、处理 I/O 事件,即 read,write 事件,在对应NioSocketChannel处理
    3、处理任务队列的任务,即runAllTasks

  • 每个WorkerGroup的NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel(通道),即通过pipeline可以获取到对应通道,每个通道中都有一个channelPipeline维护了很多的处理器channelhandler。

netty案例-TCP服务

服务端:
NettyServer

NettyServer代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws Exception {


        //创建BossGroup 和 WorkerGroup
        //说明
        //1. 创建两个线程组 bossGroup 和 workerGroup
        //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
        //3. 两个都是无限循环
        //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
        //   默认实际 cpu核数 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8



        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程来进行设置
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
//                    .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器

            System.out.println(".....服务器 is ready...");

            //绑定一个端口并且同步生成了一个 ChannelFuture 对象(也就是立马返回这样一个对象)
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //给cf 注册监听器,监控我们关心的事件

            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口 6668 成功");
                    } else {
                        System.out.println("监听端口 6668 失败");
                    }
                }
            });


            //对关闭通道事件  进行监听
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}


NettyServerHandler

NettyServerHandler代码
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

import java.util.concurrent.TimeUnit;

/*
说明
1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范)
2. 这时我们自定义一个Handler , 才能称为一个handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //读取数据事件(这里我们可以读取客户端发送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
    2. Object msg: 就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
        System.out.println("server ctx =" + ctx);
        System.out.println("看看channel 和 pipeline的关系");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链表


        //将 msg 转成一个 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //writeAndFlush 是 write + flush
        //将数据写入到缓存,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    //发生异常后, 一般是需要关闭通道

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端:
NettyClient

NettyClient代码
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws Exception {

        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();


        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                        }
                    });

            System.out.println("客户端 ok..");

            //启动客户端去连接服务器端
            //关于 ChannelFuture 要分析,涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //对关闭通道事件  进行监听
            channelFuture.channel().closeFuture().sync();
        }finally {

            group.shutdownGracefully();

        }
    }
}

NettyClientHandler

NettyClientHandler代码
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
    }

    //当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


netty中的参数组件 1、Bootstrap、ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

2、Future、ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以立即返回一个ChannelFuture,它可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,同步执行注册的监听事件

Future-Listener 机制
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

  • 通过 isDone 方法来判断当前操作是否完成;
  • 通过 isSuccess 方法来判断已完成的当前操作是否成功;
  • 通过 getCause 方法来获取已完成的当前操作失败的原因;
  • 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
  • 通过 addListener 方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;
3、Channel
  • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  • Netty 网络通信的组件,能够用于执行网络 I/O 操作。
  • Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回一个ChannelFuture,并且不保证在调用结束时所请求的 I/O 操作已完(后期通过ChannelFuture的方法查看异步执行结果)
  • 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应
    NioSocketChannel,异步的客户端 TCP Socket 连接。
    NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
    NioDatagramChannel,异步的 UDP 连接。
4、Selector
  • Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
  • 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select)这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
5、ChannelHandler 及其实现类
  • ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
  • ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
  • 当自定义一个handler类处理器时,需要继承ChannelhandlerAdapter
    image
6、Pipeline 和 ChannelPipeline
  • Pipeline中包含了多个Channel(通道)
  • 一个Channel包含了一个ChannelPipeline,而ChannePipeline中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个channeHandlerContext中又关联着一个channelHandler
  • ChannelPipeline是保存ChannelHandler的List,用于处理或拦截Channel 的入站事件和出站事件操作
  • 入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的 handler,出站事件会从链表tail往前传递到最前t个出站的handler, c两种类型的handler互不干扰
  • ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式
    image

ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置
ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置

7、ChannelHandlerContext
  • 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
  • 即ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline 和 Channel 的信息,方便对ChannelHandler进行调用。

ChannelHandlerContext的常用方法:
1、ChannelFuture close(),关闭通道
2、ChannelOutboundInvoker flush(),刷新
3、ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline 中当前 ChannelHandler 的下一个ChannelHandler 开始处理(出站)

8、ChannelOption

Netty在创建Channel实例后,一般都需要设置 ChannelOption 参数
image

9、EventLoopGroup 和其实现类 NioEventLoopGroup
  • EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
  • EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
  • BossEventLoop负责接收客户端的连接并将SocketChannel注册到 WorkerEventLoopGroup的其中一个workerEventLoop的selector上并进行后续的IO事件处理
10、Unpooled类
  • Netty提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类

ByteBuf buffer = Unpooled.buffer(10);
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));

netty的案例-群聊系统

GroupChatServer

GroupChatServer代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class GroupChatServer {

    private int port; //监听端口


    public GroupChatServer(int port) {
        this.port = port;
    }

    //编写run方法,处理客户端的请求
    public void run() throws  Exception{

        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //获取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new GroupChatServerHandler());

                        }
                    });

            System.out.println("netty 服务器启动");
            ChannelFuture channelFuture = b.bind(port).sync();

            //监听关闭
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {

        new GroupChatServer(7000).run();
    }
}

GroupChatServerHandler

GroupChatServerHandler代码
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    //这样写还要自己遍历Channel
    //public static List<Channel> channels = new ArrayList<Channel>();

    //使用一个hashmap 管理私聊(私聊本案例并未实现,只是提供个思路)
    //public static Map<String, Channel> channels = new HashMap<String,Channel>();

    //定义一个channle 组,管理所有的channel
    //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
    private static ChannelGroup  channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //handlerAdded 表示连接建立,一旦连接,第一个被执行
    //将当前channel 加入到  channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将该客户加入聊天的信息推送给其它在线的客户端

        //该方法会将 channelGroup 中所有的channel 遍历,并发送消息,我们不需要自己遍历

        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);

		//私聊如何实现
//         channels.put("userid100",channel);
		

    }

    //断开连接, 将xx客户离开信息推送给当前在线的客户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
        System.out.println("channelGroup size" + channelGroup.size());

    }

    //表示channel 处于活动状态, 提示 xx上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //这个是给服务端看的,客户端上面已经提示xxx加入群聊了
        System.out.println(ctx.channel().remoteAddress() + " 上线了~");
    }

    //表示channel 处于不活动状态, 提示 xx离线了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 离线了~");
    }

    //读取数据,转发给在线的每一个客户端
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

        //获取到当前channel
        Channel channel = ctx.channel();
        //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息

        channelGroup.forEach(ch -> {
            if(channel != ch) { //不是当前的channel,转发消息
                ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
            }else {//回显自己发送的消息给自己
                ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //关闭通道
        ctx.close();
    }
}

GroupChatClient

GroupChatClient代码
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;


public class GroupChatClient {

    //属性
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();

        try {


        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {

                        //得到pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //加入相关handler
                        pipeline.addLast("decoder", new StringDecoder());
                        pipeline.addLast("encoder", new StringEncoder());
                        //加入自定义的handler
                        pipeline.addLast(new GroupChatClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
        //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress()+ "--------");
            //客户端需要输入信息,创建一个扫描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通过channel 发送到服务器端
                channel.writeAndFlush(msg + "\r\n");
            }
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}

GroupChatClientHandler

GroupChatClientHandler代码
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {

    //从服务器拿到的数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}
netty的心跳检测机制

IdleStateHandler是netty 提供的处理空闲状态的处理器(放在ChannelPipeline维护的ChannelHandler的双向链表上):

  • long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
  • long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
  • long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
netty的编解码器

编解码器放在ChannelPipeline维护的ChannelHandler的双向链表上
服务端发数据给客户端:服务端—>出站(编码)—>Socket通道—>入站(解码)—>客户端
客户端发数据给服务端:客户端—>出站(编码)—>Socket通道—>入站(解码)—>服务端
image

编解码器(自定义编解码器时需要继承下面的其中一个类):

  • ByteToMessageDecoder
  • LineBasedFrameDecoder:这个类在 Netty 内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
  • DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
  • HttpObjectDecoder:一个 HTTP 数据的解码器
  • LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
TCP 粘包和拆包及解决方案

TCP粘包和拆包
使用优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,出现了粘包和拆包的问题,因为面向流的通信是无消息保护边界的
image

  • 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包
  • 服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为 TCP 粘包
  • 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
  • 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包。

解决方案
使用自定义协议+编解码器来解决,关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。

RPC(基于netty)

1、RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
2、两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样

RPC的调用流程图
image

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给 server stub
  • server stub 将返回导入结果进行编码并发送至消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client)得到结果

RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

上一篇:云原生入门第六章:持续交付
下一篇:没有了
网友评论