当前位置 : 主页 > 操作系统 > centos >

RocketMQ集群消息收发测试全纪录

来源:互联网 收集:自由互联 发布时间:2022-06-20
一、环境说明 ip地址 主机名 操作系统版本 RocketMQ版本 JDK版本 maven版本 备注 172.16.7.91 nameserver01 centos 7.6 4.8.0 1.8.0_291 3.6 Name Server集群 172.16.7.92 nameserver03 centos 7.6 4.8.0 1.8.0_291 3.6 Name Serv

一、环境说明

ip地址 主机名 操作系统版本 RocketMQ版本 JDK版本 maven版本 备注 172.16.7.91 nameserver01 centos 7.6 4.8.0 1.8.0_291 3.6 Name Server集群 172.16.7.92 nameserver03 centos 7.6 4.8.0 1.8.0_291 3.6 Name Server集群 172.16.7.93 master01 centos 7.6 4.8.0 1.8.0_291 3.6 Broker集群1 172.16.7.94 slave01 centos 7.6 4.8.0 1.8.0_291 3.6 Broker集群1 172.16.7.95 master02 centos 7.6 4.8.0 1.8.0_291 3.6 Broker集群2 172.16.7.96 slave02 centos 7.6 4.8.0 1.8.0_291 3.6 Broker集群2

二、部署概况

RocketMQ集群消息收发测试全纪录_rocketmq-console

三、创建Maven Project

1.新建Maven project

RocketMQ集群消息收发测试全纪录_producer_02

RocketMQ集群消息收发测试全纪录_producer_03

选择Maven Project

RocketMQ集群消息收发测试全纪录_rocketmq_04

配置目录

RocketMQ集群消息收发测试全纪录_rocketmq-console_05

选择原型

RocketMQ集群消息收发测试全纪录_consumer_06

自定义group id和artifact id,完成maven project的创建。

RocketMQ集群消息收发测试全纪录_rocketmq-console_07

2.导入依赖库

修改pom.xml,加入如下代码

  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
  </dependency>

RocketMQ集群消息收发测试全纪录_producer_08

会发现多了很多依赖包

RocketMQ集群消息收发测试全纪录_producer_09

四、生产者测试

1.测试前集群查看

启动各节点服务,查看集群状态

RocketMQ集群消息收发测试全纪录_producer_10

测试前无消息生产和消费

2.新建topic

2.1新增主题topic_test_123

RocketMQ集群消息收发测试全纪录_consumer_11

主题配置如下:

RocketMQ集群消息收发测试全纪录_producer_12

集群名为MyRocketmq,BROKER_NAME两个broker都选择

2.2查看新增的主题

RocketMQ集群消息收发测试全纪录_consumer_13

4.新建订阅组

4.1新建订阅组group_test_123

RocketMQ集群消息收发测试全纪录_producer_14

配置如下:

RocketMQ集群消息收发测试全纪录_rocketmq-console_15

4.2查看新建的订阅组

RocketMQ集群消息收发测试全纪录_consumer_16

5.新建类Producer

RocketMQ集群消息收发测试全纪录_producer_17

RocketMQ集群消息收发测试全纪录_rocketmq_18

RocketMQ集群消息收发测试全纪录_consumer_19

新建类Producer

RocketMQ集群消息收发测试全纪录_producer_20

生产者消息发送代码:

package com.my.maven.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
  public static void main(String[] args) throws Exception {
      //Instantiate with a producer group name.
      DefaultMQProducer producer = new
          DefaultMQProducer("group_test_123");
      // Specify name server addresses.
      producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
      producer.setRetryTimesWhenSendAsyncFailed(2);
      //Launch the instance.
      producer.start();
       for (int i = 0; i < 100; i++) {
          //Create a message instance, specifying topic, tag and message body.
          Message msg = new Message("topic_test_123" /* Topic */,
               "TagA" /* Tag */,
              ("Message Test" +
                  i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
          );
          //Call send message to deliver message to one of brokers.
          SendResult sendResult = producer.send(msg);
          System.out.printf("%s%n", sendResult);
      }
      //Shut down once the producer instance is not longer in use.
      producer.shutdown();
  }
}

生产者配置项 retryTimesWhenSendAsyncFailed 表示异步重试的次数,默认为 2 次,加上正常发送的1次,总共有3次发送机会。

发送消息Message Test0--Message Test99,共100条消息。

6.运行报错

运行Produce发送消息时报错,如图:

RocketMQ集群消息收发测试全纪录_rocketmq_21

解决:

RocketMQ集群消息收发测试全纪录_rocketmq-console_22

由于测试是在本地电脑虚机上进行的,同时开多个虚机和eclipse应用会占用很多内存,解决办法是进入eclipse的安装目录,修改文件eclipse.ini,将参数-Xms和-Xmx改小点即可。

7.运行Produce

RocketMQ集群消息收发测试全纪录_rocketmq_23

8.发送消息状态查看

8.1集群查看

RocketMQ集群消息收发测试全纪录_producer_24

可以看到broker-a和broker-b各产生了50条消息

8.2消息查看

RocketMQ集群消息收发测试全纪录_producer_25

消息详情:

RocketMQ集群消息收发测试全纪录_consumer_26

8.3消费者查看

RocketMQ集群消息收发测试全纪录_rocketmq-console_27

此时还未消费

五、消费者测试

1.新建类Consumer

RocketMQ集群消息收发测试全纪录_rocketmq_28

消费代码:

package com.my.maven.rocketmq;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {

  public static void main(String[] args) throws InterruptedException,
          MQClientException {

      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
               "group_test_123");
      consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");

      consumer.subscribe("topic_test_123", "TagA || TagB");
      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
      consumer.registerMessageListener(new MessageListenerConcurrently() {

          public ConsumeConcurrentlyStatus consumeMessage(
                  List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
              System.out.println(Thread.currentThread().getName()
                       + " Receive New Messages: " + msgs);
              MessageExt msg = msgs.get(0);
               if (msg.getTopic().equals("topic_test_123")) {
                   if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                      // 获取消息体
                      String message = new String(msg.getBody());
                      System.out.println("receive TagA message:" + message);
                  } else if (msg.getTags() != null
                          && msg.getTags().equals("TagB")) {
                      // 获取消息体
                      String message = new String(msg.getBody());
                      System.out.println("receive TagB message:" + message);
                  }

              }
              // 成功
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      consumer.start();
      System.out.println("Consumer Started.");
  }

}

2.运行Consumer

RocketMQ集群消息收发测试全纪录_rocketmq_29

3.消费消息状态查看

3.1消费者查看

RocketMQ集群消息收发测试全纪录_consumer_30

3.2查看消费详情

RocketMQ集群消息收发测试全纪录_consumer_31

3.3集群查看

RocketMQ集群消息收发测试全纪录_rocketmq-console_32

3.4消息详情查看

RocketMQ集群消息收发测试全纪录_rocketmq-console_33

发现消息已被消费

4消费者console日志

RocketMQ集群消息收发测试全纪录_producer_34

一共100条消息被消费

 

本文所有代码和配置文件已上传github:RocketMQ_Message_Test

单机版RocketMQ搭建详见:Centos7.6搭建RocketMQ4.8全纪录

集群版RocketMQ搭建详见:RocketMQ4.8集群搭建全纪录

集群启停详见:RocketMQ集群启停手册

RocketMQ集群消息收发测试全纪录_rocketmq_35

上一篇:Linux云计算-06_Linux磁盘管理
下一篇:没有了
网友评论