关于rabbitmq的一些问题
一、RabbitMQ的完整工作流程
RabbitMQ 的核心是生产者将消息发送到交换机,交换机根据类型和路由键将消息路由到队列,消费者再从队列中获取消息。
核心组件:
Producer(生产者): 发送消息的应用程序。
Consumer(消费者): 接收消息的应用程序。
Message(消息): 包含有效载荷(数据)和标签(元数据,如路由键)。
Exchange(交换机): 接收生产者发送的消息,并根据特定规则(交换机类型、绑定、路由键)将消息路由到一个或多个队列。
Queue(队列): 存储消息的缓冲区,等待消费者消费。
Binding(绑定): 连接交换机和队列的规则。
完整流程:
建立连接:
生产者/消费者与 RabbitMQ Broker 建立一个 TCP 连接。
在连接上创建一个 Channel(信道),Channel 是轻量级的连接,避免了频繁创建/销毁 TCP 开开销。
生产者发送消息:
生产者将消息发送到指定的 Exchange。
发送消息时,必须指定一个 Routing Key(路由键)。
交换机路由消息:
Exchange 接收到消息后,根据自身的 类型 和 Binding 规则,决定将消息投递到哪些队列。
主要的交换机类型:
Direct: 精确匹配 Routing Key。消息只会被投递到 Binding Key 与 Routing Key 完全一致 的队列。
Fanout: 广播模式。将消息投递到所有绑定到该 Exchange 的队列,忽略 Routing Key。
Topic: 模式匹配。使用通配符(
*匹配一个词,#匹配零个或多个词)来匹配 Routing Key 和 Binding Key。Headers: 通过消息的 Header 属性进行匹配,而非 Routing Key(不常用)。
消息存入队列:
根据交换机的路由结果,消息被放入一个或多个队列中。
队列是消息的最终目的地,如果没有队列绑定到 Exchange,或者没有匹配的队列,消息会被丢弃(或返回给生产者,取决于配置)。
消费者消费消息:
消费者订阅一个队列。
当队列中有消息时,RabbitMQ 可以将消息推(Push)给消费者,或者消费者主动拉(Pull)取消息。
消费者处理完消息后,需要向 RabbitMQ 发送一个确认(Acknowledgment)。
消息确认:
自动确认: 消息一发送给消费者就被认为已成功投递。风险高,如果消费者处理失败,消息就丢失了。
手动确认: 消费者在处理完消息后,必须显式地向 RabbitMQ 发送一个
basicAck命令。如果处理失败,可以发送basicNack或basicReject命令,让消息重新入队或进入死信队列。这是生产环境推荐的方式。
二、 如何保证消息幂等性
幂等性 是指消费者对同一条消息的多次消费,产生的结果与一次消费相同。
RabbitMQ 本身不提供幂等性保证(因为消息可能被重新投递),这需要由消费者来实现。
常见的解决方案:
唯一标识符:
生产者为每条消息生成一个全局唯一的 ID(如 UUID、业务主键等)。
消费者在处理消息前,先检查这个 ID 是否已经被处理过。
实现机制:
数据库唯一键: 在消费前,执行
INSERT操作,将消息 ID 插入一张表。利用数据库的唯一键约束,如果重复消费,插入会失败。Redis 原子操作: 使用
SET key value NX命令。如果 key(消息ID)不存在则设置成功,返回1;如果已存在,则设置失败,返回0。利用其原子性来判断是否已消费。
三、 如何确保不出现消息丢失
要确保消息不丢失,需要在生产者、Broker(RabbitMQ)和消费者三个环节都做好保障。
1. 生产者端确保消息发送成功
开启事务(Transaction): 性能差,不推荐。
开启确认模式(Publisher Confirm): 生产环境推荐。
生产者将信道设置为
confirm模式。所有在该信道上发布的消息都会被分配一个唯一的 ID。
一旦消息被投递到所有匹配的队列,RabbitMQ 会发送一个
basic.ack给生产者(包含消息的 ID),表示消息已成功接收。如果 RabbitMQ 内部错误导致消息丢失,会发送一个
basic.nack,生产者可以据此进行重发。
持久化消息: 在发送消息时,设置
delivery_mode=2,将消息标记为持久化。
2. RabbitMQ Broker 端确保消息不丢失
队列持久化: 声明队列时,设置
durable=True。这样即使 RabbitMQ 重启,队列本身也不会丢失。消息持久化: 如上所述,在生产者端设置
delivery_mode=2。注意: 仅当消息被投递到持久化的队列时,它才会被写入磁盘。镜像队列: 通过 RabbitMQ 的集群和镜像队列功能,将队列的消息和状态复制到集群中的多个节点上。即使一个节点崩溃,其他节点仍然可以提供服务。
3. 消费者端确保消息被成功处理
关闭自动确认,使用手动确认: 这是最关键的一步。
只有在消费者完整处理完业务逻辑后,才向 RabbitMQ 发送
basicAck。如果消费者在处理过程中崩溃(没有发送 ACK),RabbitMQ 会认为该消息处理失败,从而将其重新放入队列(如果设置了
requeue=true),投递给另一个消费者。
总结一个完整的“不丢失”配置:
生产者:
使用
Publisher Confirm机制,确保消息到达 Broker。发送消息时设置
mandatory=true或使用备用交换机,防止消息因无路由而丢失。设置
delivery_mode=2,使消息持久化。
Broker:
交换机、队列声明为持久化的(
durable=true)。部署镜像队列,实现高可用。
消费者:
使用手动确认模式,确保业务处理成功后再 ACK。
实现幂等性,以防消息重复消费。
通过这三个环节的协同保障,可以构建一个非常可靠的消息传递系统,最大限度地防止消息丢失。