整体 分析: 需确保一发一存一消费这些过程均无消息丢失 利用 ACK机制 保证每个阶段需要执行的操作成功后,再往下一个阶段推动(放行) 消息处理过程: 由上图分析可知: 消息丢失
需确保一发一存一消费这些过程均无消息丢失
利用ACK机制保证每个阶段需要执行的操作成功后,再往下一个阶段推动(放行)
消息处理过程:
由上图分析可知:
消息丢失,可能发生在三个阶段,生产阶段、存储阶段、消费阶段
如下,为每个阶段保证消息不丢失:
消息生产阶段:
利用MQ的ack确认机制,在try-catch中处理好Broker的返回值,如果返回失败,则进行重试,若重试次数过多,则进行报警日志打印,排查解决问题
消息存储阶段:
刷盘存储的消息进行多副本备份处理,从高可用角度取设计中间件,搭建集群;同时,中间件也会进行备份,至少两个节点以上备份成功之后才会给生产者返回ack确认消息
消息消费阶段:
消费者从消费队列中拉去消息后,不是立马给Broker返回ack确认消息,而是等待业务代码顺利执行完成之后,再给Broker返回ack确认消息
实现: Producer——>Broker-
发送方式
-
同步发送
- Producer向broker发送消息,会阻塞当前线程等待broker响应结果
public class SyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
-
异步发送
- Producer首先构建一个向broker发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果
public class AsyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; // 根据消息数量实例化倒计时计算器 final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount); for (int i = 0; i < messageCount; i++) { final int index = i; // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback接收异步返回结果的回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } // 等待5s countDownLatch.await(5, TimeUnit.SECONDS); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
-
Oneway
- Oneway方式只负责发送请求,不等待应答,Producer只负责把请求发出去,不会处理响应结果
public class OnewayProducer { public static void main(String[] args) throws Exception{ // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送单向消息,没有任何返回结果 producer.sendOneway(msg); } // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
-