最近在项目开发过程中会碰到一些消息队列相关的事情,包括在学习一些开源项目如eShopOnContainers过程中也发现使用了消息队列(RabbitMQ)来实现EventBus,所以准备在空余时间学一学写一写关于RabbitMQ入门文章(当然消息队列有很多种,但感觉自身对RabbitMQ接触得还是稍微多一点),在写的同时也顺便加强加强对于RabbitMQ基础知识的掌握与理解,做一些技术储备。正所谓"万丈高楼平地起,勿在浮沙筑高台"。
消息(Message):是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
队列:可以说是一个数据结构,可以存储数据,如下图,我们从右侧(队尾)插入元素(入队),从队头获取元素(出队)。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。
消息队列主要解决了以下问题:
- 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
- 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;
- 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;
例:当我们在秒杀活动场景时,比如你的服务器一秒能处理100个订单,但秒杀活动1秒进来1000个订单,持续10秒,在后端能力无法增加的情况下,你可以用消息队列将总共10000个请求压在队列里,后台消费服务按原有能力处理,比如100多秒后处理完所有请求,在这种情况下后台服务消费可能会比正常慢一点,但是不至于让系统直接宕机丢失订单数据。
上述例子实际上是用到了消息队列的限流削峰特点,它可以避免我们的系统在流量过大的情况下直接宕机,保障了系统的稳定性
RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下:
RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息。
RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。
下面我们先来看看其内部结构中出现的角色承担者的作用:
生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体和标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。
Consumer 消费者消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,也就不知道消息的生产者是谁。
Broker:消息中间件的服务节点对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。
Queue:队列,是RabbitMQ的内部对象,用于存储消息RabbitMQ中消息都只能存储在队列中,多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,如图
例:
Exchange:交换器//声明一个队列
channel.QueueDeclare(queueName, false, false, false, null);
RabbitMQ中生产者将消息发送到Exchange(交换器,通常也可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
RabbitMQ有4种交换器:
- Direct exchange
- Fanout exchange
- Topic exchange
- Headers exchange
其中前3者比较常用
后面会通过代码分别演示这几个交换器的用法
例:
Binding:绑定//创建一个非持久化的、非自动删除的、绑定类型为direct的交换器
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个Key,告诉RabbitMQ如何正确地将消息路由到队列
Connection:连接//将队列与交换机绑定
channel.QueueBind(queueName, exchangeName, routeKey);
表示到消息代理的真实 TCP 连接
Channel:信道多路复用连接中的一条独立的双向数据流通道,可读可写。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作都是在Channel这个接口中完成的,包括定义Queue、Exchange、绑定Queue与Exchange、发布消息等等。在我们进行这些操作之前我们都需要与RabbitMQ建立一个Connection(Connection 表示到消息代理的真实 TCP 连接),但是如果每次访问RabbitMQ都需要建立Connection的话,在消息量大的时候建立TCP Connection的开销无疑也是巨大的,效率也比较低。这时候Channel起了作用。Channel是Connection中的虚拟连接(AMPQ连接),Channel可以复用Connection的TCP连接,当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,就需要开辟多个Connection,将这些信道均摊到这些Connection中。
感兴趣的同学可以移步官网:Channels
当我们配置好本地环境后使用账号密码guest、guest登录时就是在访问默认的VHost("/")了。
RabbitMQ中的资源权限是针对每个Virtual Host的,用户没有全局权限,只有一个或多个的Virtual Host权限。每一个Virtual Host就相当于一个小型的RabbitMQ服务器,拥有自己的交换机、队列、绑定等、拥有自己的一套权限机制。VHost之间是相互独立的,它既能将同一个RabbitMQ的众多客户区分开来,又可以避免队列和交换器的命名冲突。
感兴趣的同学可以移步官网:Virtual Hosts
本文主要在Windows平台基于.Net 6 使用VS2022进行Demo演示,所以我们需要准备以下条件:
- 安装Erlang运行环境 erlang.download
- 安装RabbitMQ rabbitmq.download
- VS2022
- .Net 6
RabbitMQ安装好后访问http://localhost:15672/ 会看到以下界面
默认账号密码:guest,guest
登录成功后会看到以下界面
接下来我们先创建两个控制台项目RabbitMQGetStarted与Consumer,分别当作生产者与消费者
然后装一下RabbitMQ.Client NuGet包
我们先来看看RabbitMQ有哪几种消费模型:
大致可以分为3类:
- 基本消费模型(1)
- Work消息模型(2)
- 订阅模型(3,4,5)
第6个属于RPC暂时不管。
我们先来看下官网的例子:
您可以将其视为邮局:当您将要邮寄的邮件放入邮箱时,您可以确定邮递员最终会将邮件递送给您的收件人。在这个类比中,RabbitMQ 是一个邮箱、一个邮局和一个邮递员。
RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块 -消息。
P:生产者。一个发送消息的用户应用程序
C:消费者。消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
中间的框是一个队列:一个 RabbitMQ 代表消费者保存的消息缓冲区。
队列是位于 RabbitMQ 中的邮箱的名称。尽管消息流经 RabbitMQ 和您的应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制,它本质上是一个大的消息缓冲区。许多生产者可以发送去一个队列的消息,许多消费者可以尝试从一个队列接收数据。这就是我们表示队列的方式:
环境与项目都搭建好之后,我们来实现一个最基础的消息队列模型:
我们先来实现生产者的逻辑:
直接上代码:RabbitMQGetStarted Program.cs(生产者)
我们来分析生产者发送消息的过程:
-
首先需要创建到服务器的连接,在这个过程中会处理一些版本协议和身份认证等,在这里我们连接的是本机的RabbitMQ节点-因此是HostName是localhost,如果我们想连接到另一台机器的节点,我们只需在此处指定其主机名或IP地址即可。然后我们这里并没有创建其他的虚拟主机,而是直接访问默认的虚拟主机。如果有需要我们可以自主创建。
-
接下来我们需要与服务器建立一个TCP连接并创建一个通道,用于后面声明队列等操作
-
声明一个队列,由于此处未声明交换机,队列最自动绑定到默认的direct类型的交换机,上述图中已有讲解。
如果有需要我们可以自己创建不同类型的交换机,并将队列与交换机绑定;
例:
-
然后我们通过BasicPublish方法发送消息,通过源码我们看到此方法有多个重载
我们简单看下方法参数的介绍:
我们此处使用默认的交换器,所以:
exchange 绑定的是空字符串
routingKey 直接绑定的是队列的名称"hello1" -
最后当生产者完成发送消息的任务后,通道和连接将被释放,这就是我们的生产者。
直接上代码:Consumer Program.cs(消费者)
我们再来分析消费者接受消息的过程:
- 消费者的连接配置与生产者相同,我们打开一个连接和一个通道,并声明我们将要消费的队列。请注意,这与生产者发布到的队列相匹配。
- 然后通过EventingBasicConsumer.Received事件处理的回调方法告诉服务器从队列中传递消息给我们(这里涉及到了RabbitMQ的两种消费模式(Push与Pull)感兴趣的同学可自行查阅,此处示例采用Push模式。)
- 消费者确认(ack)接收到的消息(这里涉及到了RabbitMQ消息确认机制知识,可参考大佬文章 .Net RabbitMQ实战指南)
- RabbitMQ会从队列中删除相应已经被确认消费的消息
- BasicConsume方法会将信道(Channel)置为接收模式,直到取消队列的订阅,RabbitMQ会不断地推送消息给消费者。
- 这就是我们的消费者
接下来我们直接启动生产者与消费者,看下效果:
在工人之间分配任务(工作队列或者竞争消费者模式)
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反,我们将任务安排在以后完成。我们将任务封装 为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作人员时,任务将在他们之间共享。
此消息模型的好处在于如果你积压了任务,我们只需要添加更多的工作者就可以了。默认情况下,RabbitMQ 会按顺序将每条消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环(循环调度)。
下面我们基于上面的示例再来增加一个消费者。
ConsumerSecond Program.cs 直接上代码:
消费者2的代码跟消费者1代码差不多
我们同时运行生产者与消费者1、消费者2 来看下循环调度的效果:
看到这里有同学就会觉得这搞来搞去不就是一个发消息一个接受消息嘛,so easy啦!
其实不然
在实际情况中会出现各种情况 比如:
(1)工作者完成一项任务可能需要几秒钟。如果其中一个消费者开始一项长期任务并且只完成了部分任务而死去会发生什么。
(2)如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失)RabbitMQ将会做什么处理。
(3)当RabbitMQ遇到异常情况退出或崩溃时,我们的任务会怎么办,只能丢失吗?在重新启动RabbitMQ的时候能重新找回来吗?
(4)当我们有两个工人的情况下,当所有奇数消息很重而偶数消息都很轻时,一个工人将一直很忙,另一个工人几乎不做任何工作。这种情况要怎么处理呢。
有时候我们需要带着问题去找答案会更有收获。
其实上面这几种情况RabbitMQ都有解决方案
(1、2)消息确认
(3)消息持久性
(4)公平调度
由于文章篇幅有限,具体解决办法请同学们去官网查阅 Work
(3)订阅模型(三类)RabbitMQ 中消息传递模型的核心思想是生产者向交换器发送消息,从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。而交换器,一方面它接收来自生产者的消息,另一方面它将它们推送到队列中,交换必须确切地知道如何处理它收到的消息。
RabbitMQ 给我们提供了3种常用的交换器类型:
direct:定向,把消息交给符合指定routing key 的队列
topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
fanout:广播,将消息交给所有绑定到交换机的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
下面我们来分别介绍这3种交换器的用法
在这个设置中,我们可以看到绑定了两个队列的直接交换X。第一个队列使用绑定键orange进行绑定,第二个队列有两个绑定,一个使用绑定键black,另一个使用green。
在这样的设置中,使用路由键橙色发布到交换的消息 将被路由到队列Q1。带有黑色 或绿色路由键的消息将发送到Q2。所有其他消息将被丢弃。
具体示例代码可以去官网查看Routing
根据上方例子我们可以发现所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。在这种模式下消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
我们之前的代码示例没有定义交换器,默认就是使用了Direct 交换器。
现在我们重新定义一个Direct 交换器的生产者 Producer.DirectExchange
然后跟上面图例一样声明两个队列,使用不同routeKey分别将队列与交换器绑定
Program.cs
QueueDeclare与ExchangeDeclare方法详解同学们可以参考代码注释
我们启动两个消费者"hello1"与"hello2" 然后同时启动生产者
(1)我们先使用路由键orange发布消息
我们可以看到消息将被路由到队列hello1
(2)我们再使用路由键green发布消息
我们可以看到消息将被路由到队列hello2
(3)我们再使用路由键black发布消息
我们可以看到消息也会被路由到队列hello2
这就是direct交换器
fanout的路由机制(广播)如下图,即发送到 fanout 类型exchange的消息都会分发到所有绑定该exchange的队列上去。
闲话不多说,我们直接定义一个Fanout交换器的生产者 Producer.FanoutExchange
Program.cs
我们同时启动生产者与两个消费者 直接看效果:
先来看下官方示例:
发送到主题交换的消息不能有任意的 routing_key - 它必须是单词列表,由点分隔。这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:"stock.usd.nyse "、"nyse.vmw "、"quick.orange.rabbit "。路由键中可以有任意多的单词,最多为 255 个字节。
绑定键也必须采用相同的格式。主题交换背后的逻辑 类似于直接交换- 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。但是,绑定键有两个重要的特殊情况:
- "*"可以只替换一个单词。
- "#"可以代替零个或多个单词。
在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个词将描述速度,第二个是颜色,第三个是物种:"
. . "。 我们创建了三个绑定:Q1 与绑定键".orange. "绑定,Q2 与" ..rabbit "和" lazy.# "绑定。
这些绑定可以概括为:
Q1 对所有橙色动物都感兴趣。
Q2 想听听关于兔子的一切,以及关于懒惰动物的一切。
路由键设置为" quick.orange.rabbit "的消息将被传递到两个队列。消息" lazy.orange.elephant "也将发送给他们两个。另一方面,"quick.orange.fox "只会进入第一个队列,而"lazy.brown.fox "只会进入第二个队列。"lazy.pink.rabbit "只会被传递到第二个队列一次,即使它匹配两个绑定。" quick.brown.fox " 不匹配任何绑定,因此将被丢弃。如果我们违反合同并发送带有一四个单词的消息,例如" orange "或" quick.orange.male.rabbit ",会发生什么?好吧,这些消息不会匹配任何绑定并且会丢失。
另一方面,"lazy.orange.male.rabbit ",即使它有四个单词,也会匹配最后一个绑定,并被传递到第二个队列。
官方示例不想看?好吧,直接安排:
Topic Exchange使用起来非常灵活,它可以通过使用通配符(*与#)来进行模糊匹配(跟我们Api中的模糊查询类似),所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上
看一下匹配规则:
- "*" 匹配一个单词
- "#" 匹配一个或多个单词
举个简单的例子:
"mamba.*" 只可以匹配到"mamba.rabbit" 这个格式的routingkey
"mamba.#" 则可以匹配到"mamba.male.rabbit" 这个格式的routingkey
代码上一上:
dotnet run
这就是我们的Topic Exchange
整篇文章其实对于RabbitMQ的介绍并没有那么完善,包括一些死信队列等等并没有介绍到,在写的过程中也有参考其他大佬的文章,如果同学们在阅读的过程发现有什么不对的地方或者在理解上有什么偏差的地方,还希望不吝赐教。本文介绍内容只针对刚入门的同学,抛砖引玉。
参考资源:
https://www.rabbitmq.com/
https://www.cnblogs.com/sheng-jie/p/7192690.html
https://www.cnblogs.com/Stacking/p/net-core-rabbitmq-concept.html