当前位置 : 主页 > 编程语言 > 其它开发 >

RocketMQ的可靠性传输

来源:互联网 收集:自由互联 发布时间:2022-05-21
整体 分析: 需确保一发一存一消费这些过程均无消息丢失 利用 ACK机制 保证每个阶段需要执行的操作成功后,再往下一个阶段推动(放行) 消息处理过程: 由上图分析可知: 消息丢失
整体

分析:

需确保一发一存一消费这些过程均无消息丢失

利用ACK机制保证每个阶段需要执行的操作成功后,再往下一个阶段推动(放行)

消息处理过程:

由上图分析可知:

消息丢失,可能发生在三个阶段,生产阶段、存储阶段、消费阶段

如下,为每个阶段保证消息不丢失:

消息生产阶段

利用MQ的ack确认机制,在try-catch中处理好Broker的返回值,如果返回失败,则进行重试,若重试次数过多,则进行报警日志打印,排查解决问题

消息存储阶段

刷盘存储的消息进行多副本备份处理,从高可用角度取设计中间件,搭建集群;同时,中间件也会进行备份,至少两个节点以上备份成功之后才会给生产者返回ack确认消息

消息消费阶段

消费者从消费队列中拉去消息后,不是立马给Broker返回ack确认消息,而是等待业务代码顺利执行完成之后,再给Broker返回ack确认消息

实现: Producer——>Broker
  • 发送方式

    • 同步发送

      • Producer向broker发送消息,会阻塞当前线程等待broker响应结果
      public class SyncProducer {
      	public static void main(String[] args) throws Exception {
          	// 实例化消息生产者Producer
              DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          	// 设置NameServer的地址
      	    	producer.setNamesrvAddr("localhost:9876");
          	// 启动Producer实例
              producer.start();
          	for (int i = 0; i < 100; i++) {
          	    // 创建消息,并指定Topic,Tag和消息体
          	    Message msg = new Message("TopicTest" /* Topic */,
              	"TagA" /* Tag */,
              	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
              	);
              	// 发送消息到一个Broker
                  SendResult sendResult = producer.send(msg);
                  // 通过sendResult返回消息是否成功送达
                  System.out.printf("%s%n", sendResult);
          	}
          	// 如果不再发送消息,关闭Producer实例。
          	producer.shutdown();
          }
      }
      
    • 异步发送

      • Producer首先构建一个向broker发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果
      public class AsyncProducer {
      	public static void main(String[] args) throws Exception {
          	// 实例化消息生产者Producer
              DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          	// 设置NameServer的地址
              producer.setNamesrvAddr("localhost:9876");
          	// 启动Producer实例
              producer.start();
              producer.setRetryTimesWhenSendAsyncFailed(0);
      	
      	int messageCount = 100;
              // 根据消息数量实例化倒计时计算器
      	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
          	for (int i = 0; i < messageCount; i++) {
                      final int index = i;
                  	// 创建消息,并指定Topic,Tag和消息体
                      Message msg = new Message("TopicTest",
                          "TagA",
                          "OrderID188",
                          "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                      // SendCallback接收异步返回结果的回调
                      producer.send(msg, new SendCallback() {
                          @Override
                          public void onSuccess(SendResult sendResult) {
                              countDownLatch.countDown();
                              System.out.printf("%-10d OK %s %n", index,
                                  sendResult.getMsgId());
                          }
                          @Override
                          public void onException(Throwable e) {
                              countDownLatch.countDown();
            	                System.out.printf("%-10d Exception %s %n", index, e);
            	                e.printStackTrace();
                          }
                  	});
          	}
      	// 等待5s
      	countDownLatch.await(5, TimeUnit.SECONDS);
          	// 如果不再发送消息,关闭Producer实例。
          	producer.shutdown();
          }
      }
      
    • Oneway

      • Oneway方式只负责发送请求,不等待应答,Producer只负责把请求发出去,不会处理响应结果
      public class OnewayProducer {
      	public static void main(String[] args) throws Exception{
          	// 实例化消息生产者Producer
              DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          	// 设置NameServer的地址
              producer.setNamesrvAddr("localhost:9876");
          	// 启动Producer实例
              producer.start();
          	for (int i = 0; i < 100; i++) {
              	// 创建消息,并指定Topic,Tag和消息体
              	Message msg = new Message("TopicTest" /* Topic */,
                      "TagA" /* Tag */,
                      ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
              	);
              	// 发送单向消息,没有任何返回结果
              	producer.sendOneway(msg);
      
          	}
          	// 如果不再发送消息,关闭Producer实例。
          	producer.shutdown();
          }
      }
      
上一篇:5分钟学会 gRPC
下一篇:没有了
网友评论