当前位置 : 主页 > 编程语言 > java >

RocketMQ Push 消费模型示例详解

来源:互联网 收集:自由互联 发布时间:2023-01-30
目录 使用 DefaultMQPushConsumer 消费消息 基于长轮询机制的伪 push 实现 客户端侧发起的长轮询请求 服务端阻塞请求 客户端回调处理 客户端发起请求的底层逻辑 PullCallback 回调 总结 Push 模
目录
  • 使用 DefaultMQPushConsumer 消费消息
  • 基于长轮询机制的伪 push 实现
    • 客户端侧发起的长轮询请求
    • 服务端阻塞请求
    • 客户端回调处理
    • 客户端发起请求的底层逻辑
    • PullCallback 回调
  • 总结

    Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端)。

    使用 DefaultMQPushConsumer 消费消息

    下面是使用 DefaultMQPushConsumer 消费消息的官方示例代码:

    // 初始化consumer,并设置consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup");
    // 设置NameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
    consumer.subscribe("TopicTest", "*");
    //注册回调接口来处理从Broker中收到的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动Consumer
    consumer.start();
    

    这里看到主要是通过 consumer 注册回调接口来处理从 Broker 中收到的消息。这种监听回调的机制很容易想到是一种观察者模式或者事件机制;对于这种 C-S 模型的架构来说,如果要做到 Server 在有新消息时立即推送给 Client,那么 Client 和 Server 之间应该是有连接存在的,Client 端开放端口来 watch Server 的推送。这里好论证,即可以查看当前 Client 端所在进程开启了什么端口即可,通过如下指令查看:

    • 1、先通过 jps 查看 Consumer Client 的进程号
    ➜  rocketmq-4.9.4 git:(06f2208a3) jps
    10722 Jps
    4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar
    1766
    4121 BrokerStartup
    4009 NamesrvStartup
    9419 PushConsumer
    9692 RemoteMavenServer36
    

    可以看到 PushConsumer 的进程号是 9419

    • 2、通过 lsof 命令查看进程端口占用
    ➜  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN
    ➜  
    

    这里没有看到 PushConsumer 有开启端口。同样,这里可以看看 Broker 的进程端口占用

    ➜  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN
    java    4121 glmapper  137u    IPv6 0xca1142b0f200067d        0t0                 TCP *:10912 (LISTEN)
    java    4121 glmapper  141u    IPv6 0xca1142b0f1fc8cfd        0t0                 TCP *:10911 (LISTEN)
    java    4121 glmapper  142u    IPv6 0xca1142b0f1fc935d        0t0                 TCP *:10909 (LISTEN)
    

    所以得到一个初步的结论是,在 Push 模式下,Consumer Client 并没有启动端口来接收 Server 的消息推送。 那么 RocketMQ 是怎么实现的?

    基于长轮询机制的伪 push 实现

    真正的 Push 方式,是 Server 端接收到消息后,主动把消息推送给 Client 端,这种情况一般需要 Client 和 Server 之间建立长连接。通过前面的分析,Client 既然没有开启端口用于接收 Server 的信息推送,那么只有一种可能就是 Client 自己去拉了消息,但是这种主动拉消息的方式是对于用户无感的,从使用上体验上来看,做到了和 push 一样的效果;这种机制就是“长轮询”。

    为啥不用长连接方式,让 Server 主动 Push 呢?其实很好理解,对于一个提供队列服务的 Server 来说,用 Push方式主动推送有两个问题:

    • 1、会增加 Server 端的工作量,进而影响 Server 的性能
    • 2、Client 的处理能力存在差异,Client 的状态不受 Server 控制,如果 Client 不能及时处理 Server 推送过来的消息,会造成各种潜在问题

    客户端侧发起的长轮询请求

    下图是初始化相关资源的过程,DefaultMQPushConsumer 是面向用户使用的 API client 类,内部处理实际上是委托给 DefaultMQPushConsumerImpl 来处理的。DefaultMQPushConsumerImpl#start 时,会初始化 MQClientInstance ,MQClientInstance 初始化过程中又会初始化一堆资源,比如请求-响应的通道,开启各种各样的调度任务(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline状态的 Broker、定期发送心跳给 Broker、定期持久化所有 Consumer Offset等等),开启 pullMessageService,开启 rebalance Service 等等。大致的调用链如下图

    下面这个代码片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子类)

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                // 从 pullRequestQueue 中取 pullRequest
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }
        log.info(this.getServiceName() + " service end");
    }
    

    通过代码,可以直观的看起,pullMessageService 会一直从 pullRequestQueue 中取 pullRequest,然后执行 pullMessage 请求。实际上 MessageQueue 是和 pullRequest 一一对应的 ,pullRequest 全部存储到该 Consumer 的 pullRequestQueue 队列里面;消费者会不停的从 PullRequest 的队列里取 request 然后向broker 请求消息。

    这里还有一个问题是队列取出之后什么时候放回去的?在 pullMessage 的回调方法中,如果正常得到了 broker 的响应,那么会把 PullRequest放回队列,相关代码可以从 org.apache.rocketmq.client.consumer.PullCallbackonSuccess 方法中得到答案。

    服务端阻塞请求

    服务端处理 pullRequest 请求的是 PullMessageProcessor,当没有消息时,则通过 PullRequestHoldService 将当前请求先 hold 住。

    case ResponseCode.PULL_NOT_FOUND:
        if (brokerAllowSuspend && hasSuspendFlag) {
            long pollingTimeMills = suspendTimeoutMillisLong;
            // 如果是 LongPolling,则 hold 住
            if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
            }
            String topic = requestHeader.getTopic();
            long offset = requestHeader.getQueueOffset();
            int queueId = requestHeader.getQueueId();
            PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
            this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
            response = null;
            break;
        }
    

    PullRequestHoldService 中会将所有的 PullRequest 缓存到 pullRequestTable。PullRequestHoldService 也是一个 task,默认每次 hold 5s 然后再去检查是否有新的消息过来,如果有新的消息到来,则唤醒对应的线程来将消息返回给客户端。

    // 已省略无关代码
    public void run() {
        // loop
        while (!this.isStopped()) {
            // default hold 5s
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }
            long beginLockTimestamp = this.systemClock.now();
            // 检查是否有新的消息到达
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
    	}
    }
    

    客户端回调处理

    我们在编写 consumer 代码时,基于 push 模式是通过如下方式来监听消息的

    //注册回调接口来处理从Broker中收到的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    

    通过前面的分析,对于如何通过“长轮询”实现伪“push” 有了大概得了解;客户端通过一个定时任务不断向 Broker 发请求,Broker 在没有消息时先 hold 住一小段时间,当有新的消息时会立即将消息返回给 consumer;本节就主要探讨 consumer 在收到消息之后的处理逻辑,以及是怎么触发 MessageListener 回调执行的。

    客户端发起请求的底层逻辑

    以异步调用为例,代码在

    org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync中,截取部分代码如下:

    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();
            if (response != null) {
                try {
                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                    assert pullResult != null;
                    // 成功回调
                    pullCallback.onSuccess(pullResult);
                } catch (Exception e) {
                    // 异常回调
                    pullCallback.onException(e);
                }
            } else {
                if (!responseFuture.isSendRequestOK()) {
                     // 异常回调
                    pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                } else if (responseFuture.isTimeout()) {
                     // 异常回调
                    pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                        responseFuture.getCause()));
                } else {
                     // 异常回调
                    pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                }
            }
        }
    });
    

    PullCallback 回调

    PullCallback 回调逻辑在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage方法中,以正常返回消息为例:

    // 已省略无关代码
    public void onSuccess(PullResult pullResult) {
        // 将接收到的消息 交给 consumeMessageService 处理
        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
            pullResult.getMsgFoundList(),
            processQueue,
            pullRequest.getMessageQueue(),
            dispatchToConsume);
        // 将 pullRequest 放回 pullRequestQueue
     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    }
    

    ConsumeRequest 是一个 Runnable,submitConsumeRequest 就是将返回结果丢在一个单独的线程池中去处理返回结果的。ConsumeRequest 的 run 方法中,会拿到 messageListener,然后执行 consumeMessage 方法。

    总结

    到此,关于 RocketMQ push 消费模型基本就探讨完了。从实现机制上来看,push 本质上并不是在建立双向通道的前提下,由 Server 主动推送给 Client 的,而是由 Client 端触发 pullRequest 请求,以长轮询的方式“伪装”的结果。从代码上来,RocketMQ 代码中使用了非常多的异步机制,如 pullRequestQueue 来解耦发送请求和等待结果,各种定时任务等等。

    整体看,PushConsumer 采用了 长轮询+超时时间+Pull的模式, 这种方式带来的好处总结如下

    • 1、减少 Broker 的压力,避免由于不同 Consumer 消费能力导致 Broker 出现问题
    • 2、确保了 Consumer 不会负载过高,Consumer 在校验自己的缓存消息没有超过阈值才会去从 Broker 拉取消息,Broker 不会主动推过来
    • 3、兼顾了消息的即时性,Broker 在没有消息的时候会先 hold 一小段时间,有消息会立即唤起线程将消息返回给 Consumer
    • 4、Broker 端无效请求的次数大大降低,Broker 在没有消息时会挂起 PullRequest,而 Consumer 在未接收到Response 且未超时时,也不会重新发起 PullRequest

    以上就是RocketMQ Push 消费模型示例详解的详细内容,更多关于RocketMQ Push 消费模型的资料请关注自由互联其它相关文章!

    网友评论