我们可以通过 MQ 异步调用,来使程序的性能更好和解耦合。但是如果 MQ 的消息没有成功的被对应的程序处理,那么这样不就会造成数据不一致的情况。因此,我们这里必须要尽可能的确保 MQ 消息的可靠性,即:消息应该至少被消费者处理一次。
那么问题来了:
- 我们该如何确保 MQ 消息的可靠性?
- 如果真的发送失败,有没有其它的兜底方案?
相信本篇博客能给你答案。
一、发送者的可靠性
首先,我们一起分析一下消息丢失的可能性有哪些。
消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:
消息从生产者到消费者的每一步都可能导致消息丢失:
- 发送消息时丢失:
- 生产者发送消息时连接 MQ 失败。
- 生产者发送消息到达 MQ 后未找到
Exchange
- 生产者发送消息到达 MQ 的
Exchange
后,未找到合适的Queue
- MQ 导致消息丢失:
- 消息到达 MQ,保存到队列后,尚未消费就突然宕机
- 消费者处理消息时:
- 消息接收后尚未处理突然宕机
- 消息接收后处理过程中抛出异常
综上,我们要解决消息丢失问题,保证 MQ 的可靠性,就必须从 3 个方面入手:
- 确保生产者一定把消息发送到 MQ
- 确保 MQ 不会将消息弄丢
- 确保消费者一定要成功处理消息
我们先来看如何确保生产者一定能把消息发送到 MQ。
1.1 生产者重试机制:
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与 MQ 的连接中断。
为了解决这个问题,SpringAMQP 提供的消息发送时的重试机制。即:当RabbitTemplate
与 MQ 连接超时后,多次重试。
修改publisher
模块的application.yaml
文件,添加下面的内容:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过 SpringAMQP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
如果是断网的情况下,整个业务都会被影响,我们可以很容易的发现问题所在,并进行解决,不过断网一般是不太会出现。
1.2 生产者确认机制:
一般情况下,只要生产者与 MQ 之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到 MQ 之后丢失的现象,比如:
- MQ 内部处理消息的进程发生了异常
- 生产者发送消息到达 MQ 后未找到
Exchange
- 生产者发送消息到达 MQ 的
Exchange
后,未找到合适的Queue
,因此无法路由
针对上述情况,RabbitMQ 提供了生产者消息确认机制,包括Publisher Confirm
和Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给 MQ 后,MQ 会根据消息处理的情况返回不同的回执。
具体如图所示:
总结如下:
- 当消息投递到 MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回 ack 的确认信息(因为这里是程序员自己代码写错了,如果返回 NACK 后面生产者会继续投递该消息,但是代码错误,怎么投都不会成功),代表投递成功
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
- 持久消息投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功
- 其它情况都会返回 NACK,告知投递失败
其中ack
和nack
属于Publisher Confirm机制,ack
是投递成功;nack
是投递失败。而return
则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
两个机制一般配合是配合使用。
1.2.1 开启生产者确认:
在 publisher 模块的application.yaml
中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type
有三种模式可选:
none
:关闭 confirm 机制simple
:同步阻塞等待 MQ 的回执correlated
:MQ 异步回调返回回执
我们一般推荐使用 correlated。
1.2.2 定义 ReturnCallback:
每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。
内容如下:
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
1.2.3 定义 ConfirmCallback:
由于每个消息发送时的处理逻辑不一定相同,因此 ConfirmCallback 需要在每次发消息时定义。具体来说,是在调用 RabbitTemplate 中的 convertAndSend 方法时,多传递一个参数:
这里的 CorrelationData 中包含两个核心的东西:
id
:消息的唯一标示,MQ 对不同的消息的回执以此做判断,避免混淆SettableListenableFuture
:回执结果的 Future 对象
将来 MQ 的回执就会通过这个Future
来返回,我们可以提前给CorrelationData
中的Future
添加回调函数来处理消息回执:
下面我们来演示一下:
@Test
public void testConfirmRollback() throws InterruptedException {
// 1.设置唯一 id
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 2.添加回调函数
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// Future 本身发生异常的时处理的逻辑,一般不会发生
log.debug("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
if (result.isAck()) {
log.debug("发送消息成功,收到 ack!");
} else {
log.error("发送消息失败,收到 nack,reason: {}", result.getReason());
// 进行消息的重发,这里没有实现
}
}
});
// 2.发送信息给 rabbit mq
rabbitTemplate.convertAndSend("hmall.direct", "blue", "hello", cd);
}
注意:
开启生产者确认比较消耗 MQ 性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为 RoutingKey 错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ 内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启 ConfirmCallback 处理 nack 就可以了。
二、MQ 的可靠性
消息到达 MQ 以后,如果 MQ 不能及时保存,也会导致消息丢失,所以 MQ 的可靠性也非常重要。
2.1 数据持久化:
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
- 交换机持久化
- 队列持久化
- 消息持久化
我们下面以 Java 代码来进行演示:
2.1.1 交换机持久化:
下面演示 FanoutExchange 其它的交换机也都是类似的。
使第二个参数 durable 为 true。
@Bean
public FanoutExchange fanoutExchange(){
// 第一个参数 name 表示交换机名称
// 第二个参数 durable 表示是否持久化
// 第三个参数 autoDelete 表示是否自动删除
return new FanoutExchange("hmall.fanout", true, false);
}
2.1.2 队列持久化:
使第二个参数 durable 为 true。
@Bean
public Queue fanoutQueue(){
return new Queue("fanout.queue", true);
}
2.1.3 消息持久化:
对应的 API 如下:
// 发送持久化的消息给 rabbit mq
rabbitTemplate.convertAndSend("队列名", "路由名", "消息内容",
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
});
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么 MQ 会在消息持久化以后才发送 ACK 回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少 IO 次数,发送到 MQ 的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100 毫秒左右,这就会导致 ACK 有一定的延迟,因此建议生产者确认全部采用异步方式。
2.2 LazyQueue:
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
- 消费者宕机或出现网络故障
- 消息发送量激增,超过了消费者处理速度
- 消费者处理业务发生阻塞
旦出现消息堆积问题,RabbitMQ 的内存占用就会越来越高,直到触发内存预警上限。此时 RabbitMQ 会将内存消息刷到磁盘上,这个行为成为PageOut
. PageOut
会耗费一段时间,并且会阻塞队列进程。因此在这个过程中 RabbitMQ 不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的模式,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
- 支持数百万条的消息存储
而在 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式。因此官方推荐升级 MQ 为 3.12 版本或者所有队列都设置为 LazyQueue 模式。
2.2.1 控制台配置 Lazy 模式:
在添加队列的时候,添加x-queue-mod=lazy
参数即可设置队列为 Lazy 模式:
2.2.2 代码配置 Lazy 模式:
在利用 SpringAMQP 声明队列的时候,添加x-queue-mod=lazy
参数也可设置队列为 Lazy 模式:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue") // 表示创建一个名为 lazy.queue 的持久化队列
.lazy() // 开启Lazy模式
.build();
}
这里是通过QueueBuilder
的lazy()
函数配置Lazy模式,底层源码如下:
当然,我们也可以基于注解来声明队列并设置为 Lazy 模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
三、消费者可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:
- 消息投递的过程中出现了网络故障
- 消费者接收到消息后突然宕机
- 消费者接收到消息后,因处理不当导致异常
- …
一旦发生上述情况,消息也会丢失。因此,RabbitMQ 必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
但问题来了:RabbitMQ 如何得知消费者的处理状态呢?
接下来我们一起来学习一下消费者处理消息时的可靠性解决方案。
3.1 消费者确认机制:
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.
由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置处理方式,有三种模式:
none
:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用manual
:手动模式。需要自己在业务代码中调用 api,发送ack
或reject
,存在业务入侵,但更灵活auto
:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack
; - 如果是消息处理或校验异常,自动返回
reject
;
- 如果是业务异常,会自动返回
通过下面的配置可以修改 SpringAMQP 的 ACK 处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 采用自动模式
我们一般使用 auto 模式。
3.2 失败重试机制:
当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue 到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致 mq 的消息处理飙升,带来不必要的压力:
当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况 Spring 又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。
修改 consumer 服务的 application.yml 文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
配置完后需要重启 consumer 服务。
开启失败重试机制的效果如下:开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试,重试达到最大次数后,Spring 会返回 reject,消息会被丢弃。
3.3 失败处理策略:
本地达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有 3 个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
RepublishMessageRecoverer
使用示例如下:
- 在 consumer 服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
- 定义一个 RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
3.4 业务幂等性:
幂等在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。
例如:根据 id 删除数据,查询数据。
但是有的操作不是幂等的,例如扣减余额。
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
- 唯一消息 ID
- 业务状态判断
3.4.1 唯一消息 ID:
这个思路非常简单:
- 每一条消息都生成一个唯一的 ID,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库。
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
如何给消息添加唯一 ID 呢?
其实很简单,SpringAMQP 的 MessageConverter 自带了 MessageID 的功能,我们只要开启这个功能即可。
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
3.4.2 业务判断:
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如要把订单状态从未支付修改成已支付,我们可以在执行业务时判断订单状态是不是未支付,如果不是未支付,说明订单已经被处理过了,无需重复处理。
3.5 兜底方案:
虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息 100% 的可靠。万一真的 MQ 通知失败该怎么办呢?
有没有什么兜底的方案?
解决思路:既然 MQ 通知不一定发送到消费者服务,那么消费者服务就必须自己主动去查询。这样即便消费者服务的 MQ 通知失败,我们依然能通过主动查询来保证业务状态的一致。
这里又出现一个问题:消费者什么时候主动查询?总不能一开始就查吧。
这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔 20 秒就查询一次,并判断发送者业务状态。如果发现发送者业务状态已经修改,则立刻更新消费者业务状态即可。
至此,消息可靠性的问题已经解决了。
四、延迟消息
在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。
但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用 MQ 的延迟消息了。
在 RabbitMQ 中实现延迟消息也有两种方案:
- 死信交换机 + TTL
- 延迟消息插件
4.1 死信交换机和延迟消息:
4.1.1 死信交换机:
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为 false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机**(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
4.1.2 延迟消息:
前面两种作用场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer
作用类似。
而最后一种场景,大家设想一下这样的场景:
如图,有一组绑定的交换机(ttl.fanout
)和队列(ttl.queue
)。但是ttl.queue
没有消费者监听,而是设定了死信交换机hmall.direct
,而队列direct.queue1
则与死信交换机绑定,RoutingKey是 blue:
假如我们现在发送一条消息到ttl.fanout
,RoutingKey 为 blue,并设置消息的有效期为 5000 毫秒:
注意:尽管这里的ttl.fanout
不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct
才能正确路由消息。
消息肯定会被投递到ttl.queue
之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信,死信被再次投递到死信交换机hmall.direct
,并沿用之前的 RoutingKey,也就是blue
,由于direct.queue1
与hmall.direct
绑定的 key 是 blue,因此最终消息被成功路由到direct.queue1
,如果此时有消费者与direct.queue1
绑定, 也就能成功消费消息了。但此时已经是 5 秒钟以后了。
也就是说,publisher 发送了一条消息,但最终 consumer 在 5 秒后才收到消息。我们成功实现了延迟消息。
注意:
RabbitMQ 的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的 TTL 时间不一定准确。
4.2 DelayExchange 插件:
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此 RabbitMQ 社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
下面演示使用,不演示下载。
4.2.1 基于注解方式声明延迟交换机:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
4.2.2 基于 @Bean 的方式:
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
4.2.3 发送延迟消息:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
注意:
延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。
参考文献:
- 黑马程序员
结语:
其实写博客不仅仅是为了教大家,同时这也有利于我巩固知识点,和做一个学习的总结,由于作者水平有限,对文章有任何问题还请指出,非常感谢。如果大家有所收获的话,还请不要吝啬你们的点赞收藏和关注,这可以激励我写出更加优秀的文章。