当前位置 : 主页 > 编程语言 > python >

Python&Rabbitmq--持久化

来源:互联网 收集:自由互联 发布时间:2022-07-22
RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化、Queue持久化及Message的持久化等。以保证RabbitMQ在重启或Crash等异常情况下,消息不会丢失。RabbitMQ提供了简单的参数配置来实现持


 

RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化、Queue持久化及Message的持久化等。以保证RabbitMQ在重启或Crash等异常情况下,消息不会丢失。RabbitMQ提供了简单的参数配置来实现持久化操作。

Queue持久化:队列是我们使用RabbitMQ进行数据传输的最多使用的方式,是进行点对点消息传递使用最多的方式。队列的持久化是通过durable=true 来实现。

# durable 持久化
# auto_delete 是否自动删除queue 当还一个消费者断开连接
# 切换到指定的队列中,如果队列不存在,则创建
channel.queue_declare(queue='hello', durable=True, exclusive=False, auto_delete=False)

参数说明:

1. queue:队列名称。

2. durable:设置是否执行持久化。如果设置为true,即durable=true,持久化实现的重要参数

3. exclusive:指示队列是否是排他性。如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。

需要注意:

1. 排他队列是基于连接可见的,同一连接的不同信道Channel是可以同时访问同一连接创建的排他队列;

2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;

3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

4. autoDelete:是否自动删除。如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于发布订阅方式创建的临时队列。

消息的持久化:如果将一个队列设置为持久化,那么会创建一个持久化的队列,但并不意味着队列中的消息也会持久化存储。因此如果要保证消息在RabbitMQ出现异常时不会丢失,需要设定消息的持久化。

简要说明一下消息持久化和队列持久化的联系:

队列设置为持久化,那么在RabbitMQ重启之后,持久化的队列也会存在,并会保持和重启前一致的队列参数。

消息设置为持久化,在RabbitMQ重启之后,持久化的消息也会存在。

那么就会出现一些矛盾的地方:

1、因为消息必须依附于队列存在才有意义,那么如果队列设置为非持久化,而消息设置为持久化。在RabbitMQ重启之后,持久化的消息是否还存在呢?因为非持久化的队列可能并不存在。

2、如果设置消息持久化为true,但队列设置成排他性队列,那么在RabbitMQ重启之后,消息是否仍然存在。请自行查找分析,下次分析该问题。

通过RabbitMQ SDK发送消息至MQ非常简单,通过BasicPublish即可。

 BasicPublish 的定义:

channel.basic_publish(exchange='', routing_key='hello', body=msg,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)

设置消息持久化,需要设置BasicProperties的delivery_mode=2 (设置 1为不持久化 , 设置 2为持久化).

注意:

设置了队列和消息持久化后,当服务重启之后,消息仍然存在。只设置队列持久化,不设置消息持久化,重启之后消息会丢失;只设置消息持久化,不设置队列持久化,在服务重启后,队列会消失,从而依附于队列的消息也会丢失。只设置消息持久化而不设置队列的持久化,毫无意义。

Exchange持久化:

为了实现一对多的消息发送,我们一般会采用发布订阅模式,通过一个发送端、多个订阅端来实现消息的分发。

Python&Rabbitmq--持久化_重启

发布订阅模式存在一些问题:

1、如果消费者由于网络或其他原因,与RabbitMQ的连接断开,那么RabbitMQ会自动将与其对应的队列删除,当消息程序重新连接以后,无法获取断开前未来得及消费的消息。

2、如果RabbitMQ出现故障或Crash,那么在RabbitMQ  服务重启之后,消费端未及时消费的消息也会丢失,并且如果Exchange 不设置成持久化,那么在MQ服务重启之后,Exchange也不会存在。

# 创建exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout',durable=True, auto_delete=False)

参数说明:exchange:RabbitMQ中定义的Exchange名称,exchange_type:类型,包含fanout、topic、direct、headers,durable:持久化设置。设置成true,就可以设定exchange持久化存储,autodelete:是否自动删除

总结:

  • Exchange 持久化:如果不设定Exchange持久化,那么在RabbitMQ由于某些异常等原因重启之后,Exchange会丢失。Exchange丢失, 会影响发送端发送消息到RabbitMQ。
  • Queue持久化:发送端将消息发送至Exchange,Exchange将消息转发至关联的Queue。如果Queue不设置持久化,那么在RabbitMQ重启之后,Queue信息会丢失。导致消息发送至Exchange,但Exchange不知道需要将该消息发送至哪些具体的队列。
  • Message持久化:发送端将消息发送至Exchange,Exchange将消息转发至关联的Queue,消息存储于具体的Queue中。如果RabbitMQ重启之后,由于Message未设置持久化,那么消息会在重启之后丢失。
  • 为了保证发布订阅的持久化,必须设置Exchange、Queue、Message的持久化,才可以保证消息最终不会丢失。
  • 虽然持久化会造成性能损耗,但为了生产环境的数据一致性,这是必须做出的选择。但可以通过设置消息过期时间、降低发送消息大小等其他方式来尽可能的降低MQ性能的降低。

 

拓展:

将Exchange、Queue、Message都设置持久化,能保证消息100%会被成功消费吗?

答案肯定是否,天下没有绝对的事情,尤其是复杂的MQ。

一、如果消息的自动确认为true,那么在消息被接收以后,RabbitMQ就会删除该消息,假如消费端此时宕机,那么消息就会丢失。因此需要将消息设置为手动确认。

二、设置手动确认会出现另一个问题,如果消息已被成功处理,但在消息确认过程中出现问题,那么在消费端重启后,消息会重新被消费。

三、发送端为了保证消息会成功投递,一般会设定重试。如果消息发送至RabbitMQ之后,在RabbitMQ回复已成功接收消息过程中出现异常,那么发送端会重新发送该消息,从而造成消息重复发送。

四、RabbitMQ的消息磁盘写入,如果出现问题,也会造成消息丢失。

网友评论