当前位置 : 主页 > 网络编程 > 其它编程 >

SpringCloud学习之路(四):SpringCloud整合RabbitMq(一)RabbitMq基本原理及自动创建并监听队列

来源:互联网 收集:自由互联 发布时间:2023-07-02
前言什么是RabbitMqrabbitMq是消息队列的一种那么问题来了什么是消息队列呢戳这里消息队列中间件是大型系统中的重要 前言 什么是RabbitMq rabbitMq是消息队列的一种; 那么问题来了,什么
前言什么是RabbitMqrabbitMq是消息队列的一种那么问题来了什么是消息队列呢戳这里消息队列中间件是大型系统中的重要

前言

什么是RabbitMq

rabbitMq是消息队列的一种;

那么问题来了,什么是消息队列呢?戳这里

消息队列中间件是大型系统中的重要组件,它具有松耦合、异步消息、流量削峰、可靠投递、广播、流量控制、最终一致性等一系列功能,已经成为异步RPC的主要手段之一。 目前常见的消息中间件有ActiveMQ、RabbitMQ、ZeroMQ等。

我也想写的详细,写得多,写得好,但是实力不允许

 

所以请看下面

ActiveMQ、RabbitMQ、ZeroMQ三者比较

论综合实力,RabbitMq为最佳。(不要问,公司在用这个,强行解释一波)

首先,我们要安装rabbitMq

RabbitMq的安装与下载

 

rabbitMq 基本使用

 

rabbitMq 交换机类型及基本使用

 

简单的一个消息推送到接收的流程图

名词解释

Queue: 到这,大家基本都知道RabbitMQ就是消息队列的一种实现,那么围绕这个,我们就可以思考一个消息队列到底需要什么,当然是需要队列,那么这个队列就是Queue。Queue是RabbitMQ的内部对象,用于存储消息。可以指定name来唯一确定。

Exchange:交换机,接收消息,并根据路由键转发消息到绑定的队列,即通过binding-key 与 routing-key 的匹配关系来决定将消息分发到指定queue

RoutingKey:路由键,将消息路由到指定的队列,Exchange和Queue建立绑定关系的key,根据路由键,消息到交换机的时候

 

以上都是基本的使用,创建的时候都是用生产者去创建queue的,但是我们一般在正式的使用场景中,用的最多(我用的最多)的还是消费者自动创建交换机,路由key 及 queue。

 

在开始项目之前,我们需要做三件事:

1.添加依赖

 

org.springframework.bootspring-boot-starter-amqp

 

2. 了解两个注解:@RabbitHandler @RabbitListener()

@RabbitHandler:这个注解没什么意义,仅仅作为标识处理注解。可加可不加,建议是加上,不要问为什么,问也不知道,直接加上,不会错;

@RabbitHandler注解没找到解释,上面解释仅根据某翻译提供的内容个人理解的!

 

@RabbitListener():这个表示监听指定的queue,也可以自动创建并监听queue。 划重点!

 

Spring-rabbit之@RabbitListener解析

有2种方式配置RabbitListener

一种使用RabbitListener标注类且使用RabbitHandler标注方法,

另一种使用RabbitListener标注方法,

两种模式同时存在也可以。

使用RabbitHandler标注的方法和RabbitListener标注的方法都会被当做消费者的消息监听方法

 

 

3. 消息发送和接受确认(ACK):

默认情况下如果一个 消息 被消费者所正确接收则会被从 Queue 中移除,这种行为就叫做消息确认(ACK)

ACK分为自动ACK和手动ACK

自动ACK:消费者一旦接收消息,自动ACK

手动ACK:消息接收后,不会ACK,需调用 basicAck 方法

 

敲黑板,划重点:ack最好手动,自动ACK接收消息后就自动ACK,消息易丢失,万一这时候消费者服务挂掉,消息直接丢!!

当然,具体还是要看业务,一般来说,自动ACK比手动ACK要效率要高。

ACK解析

 

但是我们在实际开发中,ACK的时候没有上述资料说的那么复杂,

直接手动调用basicAck方法即可 :  channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

4.1  消费者(consumer-demo.yml) 配置相关参数:

spring:rabbitmq:host: 127.0.0.1port: 5672# 访问帐号密码不可使用guest 需要新建username: textpassword: text#虚拟主机,可自定义virtual-host: /#生产者与broker之间的消息确认称为public confirms,#public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。#发送确认publisher-confirms: true#如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题。listener:# 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认direct:acknowledge-mode: manual#消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认simple:acknowledge-mode: manual#数字是几就是几个线程监听队列concurrency: 3#交换机配置rabbit:queue:name: text

 

4.2  生产者(product-demo.yml) 配置相关参数:

 

spring:rabbitmq:host: 127.0.0.1port: 5672# 访问帐号密码不可使用guest 需要新建username: etspassword: ets@123#虚拟主机virtual-host: /#生产者与broker之间的消息确认称为public confirms,#public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。#发送确认publisher-confirms: true#接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁publisher-returns: true#如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题listener:# 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认direct:acknowledge-mode: manual#消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认simple:acknowledge-mode: manual#数字是几就是几个线程监听队列concurrency: 3

 

配置文件注释有点详细,不要在意这些细节!不详细看不懂啊!

 

 

 

 

消费者和生产者的配置基本差不多。

一个生产者对应多个消费者(我用的toptic模式,看具体业务 : direct,直连,1:1;toptic,主题,1:n;fanout,广播,n:n),所以,生产者这边的配置文件不能配置自定义Exchange名称,需动态获取。当然了,direct模式下,自定义和动态创建都是可以的!

消费者这边的接收确认根据业务需求进行手动ack,所以  publisher-returns(接收消息确认,默认false) 默认就行,生产者这边则需要配置publisher-returns 为true ,自动确认。

以上为个人理解。有误请指出,谢谢!

 

 

 

 

------------------基本知识聊完,看下面,好戏开场了---------------------

RabbitMq四中交换机

 

打开我们项目,就前三集那个项目:

 

在consumer-demo 项目新建以下目录(在任意一个类都行,这是把rabbitMq的放在一起):

 

新建个方法,开始写:

 

@RabbitHandler@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbit.queue.name}"+"-exchange",type = "topic"),value = @Queue(value = "${rabbit.queue.name}"+"-text-queue",durable = "true"),key = "${rabbit.queue.name}"+"-text-queue"))public void textRabbit(Channel channel,Message message){}

 

这里解释下各个名词:

 bindings:绑定,自动声明。不能和queue同时指定,否则报错

 @QueueBinding  : queue绑定

 exchange : 交换机指定

 @Exchange: 中的value是交换机名称,type为交换机类型

 @Queue:   中的value是queue名称,durable :持久化

 key:Routingkey。

 

(个人猜测理解,有误请指出, 感谢!)

Message: 看名字就知道,消息内容。直接   message.getBody()就可以取到消息。

Channel:  信道,TCP虚拟连接, AMQP的命令都是通过信道发送出去的,每条信道都会被指派一个唯一ID。一个TCP连接,对应多个信道,理论上无限制,减少TCP创建和销毁的开销,实现共用TCP的效果。但是一个生产者或一个消费者与MQ服务器之间只有一条TCP连接

 

完整方法:

@RabbitHandler@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "${rabbit.queue.name}"+"-exchange",type = "topic"),value = @Queue(value = "${rabbit.queue.name}"+"-text-queue",durable = "true"),key = "${rabbit.queue.name}"+"-text-queue"))public void textRabbit(Channel channel,Message message){try {//msg就是队列内容,需自己处理String msg=new String(message.getBody(),"UTF-8");System.out.println(msg);} catch (Exception e) {e.printStackTrace();}finally {try {//ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (IOException e) {e.printStackTrace();}}}

注意:

1.创建queue和key唯一,相同会直接报错!

2.别忘了ACK!!原因上面有!下面也有!

RabbitMq的Ack机制  

一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 

 

--------到这基本consumer的代码就结束了----------

 

下面来看product-demo:

首先和consumer-demo一样,创建一个单独的类

 

然后注入 RabbitTemplate,这个直接注入就行

 

@AutowiredRabbitTemplate rabbitTemplate;

 

在源码中,RabbitTemplate实现RabbitOperations,RabbitOperations实现了AmqpTemplate

 

 

 

 

我们直接调用convertAndSend方法来发送消息 ,convertAndSend能获取请求消息队列所返回的参数(反正根据需求来)

参数格式为:convertAndSend(Exchange,RouteKey,Message,ID)

Exchange,RouteKey都是消费者自动创建的,所以在生产者这里是动态的!消费者会根据不同业务(不同方法)创建Exchange,RouteKey,Queue,生产者这边根据Exchange,RouteKey就可以找到指定的Queue,把消息存在queue中

ID为消息唯一ID

以上根据需求自行选择参数!非必填!

这里由于是测试的,就用固定值了,想试试动态的话,自己封装公共方法自定义参数即可:

 

@Componentpublic class ProductRabbit {@AutowiredRabbitTemplate rabbitTemplate;public String sendAndReceive() {rabbitTemplate.setReplyTimeout(1000 * 10);CorrelationData correlatiOnData= new CorrelationData();//消息唯一IDcorrelationData.setId(UUID.randomUUID().toString());Map map=new HashMap();map.put("name","测试");map.put("sex",1);//sendMsg,需自己处理String sendMsg = JSON.toJSONString(JSON.toJSONString(map));try {Object replyMessage = rabbitTemplate.convertSendAndReceive("text-exchange", "text-queue", sendMsg, correlationData);if (replyMessage == null) {//可自行处理return "返回为null";} else {return replyMessage.toString();}} catch (Exception ex) {//自行处理异常return "系统异常";}}}

 

我们只需在另外的方法中调用这个即可!

 

 

 

附配置文件完整版(nacos上的配置文件):

consumer-application.yaml

 

spring:application:name: consumer-applicationrabbitmq:host: 127.0.0.1port: 5672# 访问帐号密码不可使用guest 需要新建username: textpassword: text#虚拟主机virtual-host: /#生产者与broker之间的消息确认称为public confirms,#public confirms机制用于解决生产者与Rabbitmq服务器之间消息可靠传输,它在消息服务器持久化消息后通知消息生产者发送成功。#发送确认publisher-confirms: true#接收确认,默认关闭,建议false或不配置,在代码中根据实际情况进行ack销毁publisher-returns: false#如果在配置文件中没有设置这个ACK确认,那么消费者每次重启都会收到这个消息。可以结合confirm使用,处理生产者和消费者丢数据的问题listener:# 消息发送确认 : AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认direct:acknowledge-mode: manual#消息接收确认模式: AcknowledgeMode.NONE:不确认,AcknowledgeMode.AUTO:自动确认,AcknowledgeMode.MANUAL:手动确认simple:acknowledge-mode: manual#数字是几就是几个线程监听队列concurrency: 3#交换机配置rabbit:queue:name: textserver:port: 8096

 

全部配置结束,我们先启动consumer-demo, 启动后会自动创建一个queue!

 

点进去

 

 

 

我们关闭consumer-demo,打开product-demo,访问,返回OK!(不关闭consumer-demo的话,消息会直接被ACK!)

 

打开刚才的rabbitMq管理页面,会看到消息收到了!

三个单词分别表示:

Ready:待消费的消息总数。

Unacked:待应答的消息总数。

Total:总数 Ready+Unacked。

 

 

 

在页面找到 get message 点击 :  会看到发送的内容

 

 

我们启动consumer-demo

 

 

打开刚才的rabbitMq管理页面,会发现消息没了,而我们消费者已经拿到了数据,消息确认了,队列的消息就木了。

 

 

如果我们把ack去掉呢?

请看:

 

 

 

如果不进行ack的话,可能无论多少次,获取到的消息都不会消失(当然,rabbitMq有机制,到一定次数就不会推送了!!),这种情况在项目中可能会存在重复数据的可能性,根据需求进行选择。

 

 

----------------------------------------THE END --------------------------------

 

纯学习笔记,有误指出,谢谢!

 

昨天用家里电脑装rabbitMq翻车了

 

 

用公司电脑的环境弄的。晚上回去再试试。。。。。。

上一篇:Abaqus空间管路自动化建模与分析
下一篇:没有了
网友评论