当前位置 : 主页 > 编程语言 > 其它开发 >

消息队列,推拉模式的区别在哪?

来源:互联网 收集:自由互联 发布时间:2022-05-18
大家好,我是【架构摆渡人】,一只十年的程序猿。这是消息队列的第五篇文章,这个系列会给大家分享很多在实际工作中有用的经验,如果有收获,还请分享给更多的朋友。 在学习消

大家好,我是【架构摆渡人】,一只十年的程序猿。这是消息队列的第五篇文章,这个系列会给大家分享很多在实际工作中有用的经验,如果有收获,还请分享给更多的朋友。

在学习消息队列的时候,大家都有一个共同的问题,那就是消息到底是服务端推送给客户端还是客户端主动去服务端拉取然后进行消费。今天这篇文章就来解答大家的这个的疑问。

推模式

首先我们来解决下什么是推模式,顾名思义,推模式就是我推给你。在MQ中也就是Broker收到消息后主动推送给Consumer的操作,叫做推模式。

推模式的实现是客户端会与服务端(Broker)建立长连接,当有消息时服务端会通过长连接通道将消息推送给客户端,这样客户端就能实时消费到最新的消息。

优点:

  • 实时性强,有消息立马推送给客户端。

  • 客户端实现简单,只需要监听服务端的推送即可。
    缺点:

  • 容易导致客户端发生消息堆积的情况,因为每个客户端的消费能力是不同的,如果简单粗暴的有消息就推送,就会会出现堆积情况。

  • 服务端逻辑复杂,因为简单的推送会导致客户端出现堆积问题,所以服务端需要进行优化。记录给每个客户端的推送数据,然后根据每个客户端的消费能力去平衡数据推送的速度。

拉模式

拉模式,顾名思义,就是我主动去拉取消息。在MQ中也就是Consumer主动向Broker询问:有没有消息啊,有的话给我一部分呗,我先拉1000条进行处理,处理完成之后再拉1000条。

拉模式肯定不能用传统的定时拉取,定时长及时性无法保证,定时短,在没有消息的情况下对服务端会一直请求。所以很多拉模式都是基于长轮询来实现。

长轮询就是客户端向服务端发起请求,如果此时有数据就直接返回,如果没有数据就保持连接,等到有数据时就直接返回。如果一直没有数据,超时后客户端再次发起请求,保持连接,这就是长轮询的实现原理。很多的开源框架都是用的这种方式,比如配置中心Apollo的推送。

优点:

  • 不会造成客户端消息积压,消费完了再去拉取,主动权在自己手中。

  • 长轮询实现的拉模式实时性也能够保证。
    缺点:

  • 客户端的逻辑实现相对复杂点,简化了服务端的逻辑。
    推和拉都有各自的优势和劣势,不过目前主流的消息队列大部分都用的拉模式,比如RocketMQ,Kafka。

拉模式代码实现

Java中可以使用Spring DeferredResult来实现异步请求,如果有消息就直接返回,没有消息则将此请求存储起来,等到有消息是再通知该请求进行返回。如果一直没有消息那么就等到超时,客户端收到超时消息重新进行消息的查询。

首先我们定义一个消息查询的接口,定义如下:

@GetMapping("/queryMessage")
public DeferredResult<String> queryMessage(String client) {
    DeferredResult<String> deferredResult = new DeferredResult<>(10000L);
    String msg = messageQueue.poll();
    if (Objects.nonNull(msg)) {
        deferredResult.setResult(msg);
    } else {
        deferredResult.onTimeout(() -> {
            deferredResultMap.remove(client);
        });
        deferredResultMap.put(client, deferredResult);
    }
    return deferredResult;
}

指定DeferredResult的超时时间为10秒,然后从messageQueue中获取消息,此处的逻辑就是获取没有被消息的消息,这里只是模拟。

如果有消息直接设置DeferredResult的result,立马返回。如果当前没有消息则注册一个超时的回调,进行DeferredResult的移除动作。同时将DeferredResult对象缓存起来。

然后我们在写一个添加消息的接口,定义如下:

@GetMapping("/addMessage")
public String addMessage(String client) {
    messageQueue.add("test");
    DeferredResult deferredResult = deferredResultMap.get(client);
    if (Objects.nonNull(deferredResult)) {
        deferredResult.setResult("test");
    }
    return "success";
}

当有消息添加的时候,根据对应的client获取缓存的DeferredResult,如果有的话就直接设置结果,立马返回,这样客户端就能立马收到新的消息,实时性也有保证。

接下来模拟一个客户端去查询消息,定义如下:

ublic class MqClient {
    public static void main(String[] args) {
        queryMessage();
    }
    private static void queryMessage() {
        String result = request("http://localhost:8080/queryMessage?client=xxx");
        if (result != null) {
            // 本地进行消费
            // ......
        }
        // 继续拉取消息
        queryMessage();
    }
    private static String request(String url) {
        HttpURLConnection connection = null;
        BufferedReader reader = null;
        try {
            URL getUrl = new URL(url);
            connection = (HttpURLConnection) getUrl.openConnection();
            connection.setReadTimeout(20000);
            connection.setConnectTimeout(3000);
            connection.setRequestMethod("GET");
            connection.setRequestProperty("Accept-Charset", "utf-8");
            connection.setRequestProperty("Content-Type", "application/json");
            connection.setRequestProperty("Charset", "UTF-8");
            System.out.println(connection.getResponseCode());
            if (200 == connection.getResponseCode()) {
                reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
                StringBuilder result = new StringBuilder();
                String line = null;
                while ((line = reader.readLine()) != null) {
                    result.append(line);
                }
                System.out.println("结果 " + result);
                return result.toString();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                connection.disconnect();
            }
        }
        return null;
    }
}

这里需要注意的是,客户端的请求超时时间要大于服务端定义的超时时间,主流程就是有消息进行本地消费,然后继续拉取。

如果没有消息,请求会一直等待,知道服务端超时,此时客户端这边会拿到http response code的值为503,然后继续查询消息。

所以大家可以看到,拉模式主要是客户端来主导,至于拉取速度客户端都可以进行控制,如果消息量够大的话,每次拉取都能拿到没有被消费的数据,基本上不会产生等等超时的情况。即使某些时候没有拉取到新的消息,只要有新消息,服务端也会立马获取等待的DeferredResult进行结果的设置,立马响应结果。

总结

本文给大家介绍了推拉模式的概念以及各自的优劣势,同时也介绍了拉模式的实现原理,当然本文所示的代码并不代码开源框架里面就是用的这种方式,只是告诉大家长轮询的基本实现方式。

如果大家有这样推送的场景,如果想用最简单的方式实现,长轮询是一个不错的方式。在很多开源框架中都有类似的应用。

原创:架构摆渡人(公众号ID:jiagoubaiduren),欢迎分享,转载请保留出处。

开发自测mock框架:https://github.com/yinjihuan/fox-mock

网友评论