Rabbitmq怎么保证消息的可靠性?

本文详细介绍了RabbitMQ如何在消费端和生产端通过设置autoAck、死信队列、消息持久化以及publisher-confirm和publisher-returns机制来提升消息的可靠性和错误处理。同时,展示了Java中如何在SpringBoot中使用这些特性进行配置和编程实现。
摘要由CSDN通过智能技术生成

一、消费端消息可靠性保证

  1. 消息确认(Acknowledgements)

消费者在接收到消息后,默认情况下RabbitMQ会自动确认消息(autoAck=true)。为保证消息可靠性,可以设置autoAck=false,使得消费者在处理完消息后手动发送确认(basicAck)。如果消费者在处理过程中发生异常或者未完成处理就终止运行,那么消息在超时时间内将不会被删除,会再次被RabbitMQ投递给其他消费者。

      2.死信队列(Dead Letter Queue):

当消息不能被正常消费时(比如达到最大重试次数),可以通过设置TTL(Time To Live)或者死信交换器(Dead Letter Exchange)将消息路由至死信队列,从而有机会后续分析和处理这些无法正常消费的消息。

二、生产端消息可靠性保证:

  1. 消息持久化

当生产者发布消息时,可以选择将其标记为持久化(persistent).这意味着即使 RabbitMQ 服务器重启,消息也不会丢失,因为它们会被存储在磁盘上。

       2.确认(Confirm)机制

开启confirm回调模式后,RabbitMQ会在消息成功写入到磁盘并至少被一个交换器接受后,向生产者发送一个确认(acknowledgement)。若消息丢失或无法投递给任何队列,RabbitMQ将会发送一个否定确认(nack). 生产者可以根据这些确认信号判断消息是否成功送达并采取相应的重试策略。

RabbitMQ作为消息中间件并启用publisher confirms(发布者确认)与publisher returns(发布者退回)机制时,可以确保消息从生产者到交换机的投递过程得到更准确的状态反馈。



1.@PostConstruct注解

@PostConstruct注解是Java EE规范中的一部分,主要用于标记在一个Bean初始化完成后需要执行的方法。这个注解由JSR-250定义,并且在Spring框架以及其他遵循Java EE标准的应用服务器中广泛支持。

功能与用途:初始化方法,当容器完成对Bean的实例化并且所有依赖注入完成后,将会自动调用标有@PostConstruct注解的方法。这为开发者提供了一个机会,在对象正式投入使用之前进行一些必要的初始化工作,比如初始化资源、预计算某些值、启动后台任务等增强。

2. Publisher Confirms(发布者确认)

作用: Publisher Confirm机制允许RabbitMQ服务器通知生产者一个消息是否已经被交换机正确接收。当publisher-confirm-type设置为CORRELATED时,RabbitMQ会向生产者发送确认或否定响应,确认消息已到达交换机,但不保证消息已被路由到至少一个队列中。

生产者到交换机的确认(消息到达交换机)

2.1.配置:

spring.rabbitmq.publisher-confirm-type = CORRELATED

2.2. 代码实现

只要到达交换机就会触发

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            // 消息成功投递成功并被确认
        } else {
            // 消息未能正确投递
        }
    }
});

3.Publisher Returns(发布者退回)

作用: Publisher Return机制用于当消息无法按照路由键规则路由到任何队列时,或者由于其他原因(例如队列满、消息过大等)而被交换机拒绝时,RabbitMQ将消息返回给生产者。

交换机到队列的确认(消息是否正常发送到了队列)

通过实现 ReturnCallback 接口,发送消息失败返回,比如交换机路由不到队列时触发回调:

1.只有消息没有路由到队列的时候,才触发该回调 .

2.只要有一个队列接受到消息了,它就认为成功.

3.1 配置

spring.rabbitmq.publisher-returns = true

3.2 代码实现

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 处理未被路由或因某种原因被退回的消息
    }
});

4.完整代码

4.1消费者

/*
 * Copyright (c) 2020, 2024,  All rights reserved.
 *
 */
package com.by.consumer;

import cn.hutool.core.map.MapUtil;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * <p>Project: spring-boot-rabbitMQ - DirectConsumer</p>
 * <p>Powered by scl On 2024-04-07 16:57:20</p>
 * <p>描述:<p>
 *
 * @author 孙臣龙 [1846080280@qq.com]
 * @version 1.0
 * @since 17
 */
@Configuration
public class ReliabilityConsumer2 {

    //注册队列
    @Bean
    public Queue queue1() {
        return QueueBuilder.durable("Re_Q01").deadLetterExchange("dead_E01").deadLetterRoutingKey("DK01").build();
    }

    //注册交换机
    @Bean
    public CustomExchange exchange() {
        Map<String, Object> map = MapUtil.of("x-delayed-type", "direct");
        return new CustomExchange("Re_E01", "x-delayed-message", true, false, map);
    }

    //绑定交换机和队列
    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue1()).to(exchange()).with("RK01").noargs();
    }

    //注册一个死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("dead_E01");
    }

    //注册一个死信队列
    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable("dead_Q01").build();
    }
    //绑定死信交换机和死信队列
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DK01");
    }


    //启动一个消费者
    @RabbitListener(queues = "Re_Q01")
    public void receiveMessage(OrderKO msg) {
        System.out.println("消费者2:" + msg);
    }
}

 4.2生产者

/*
 * Copyright (c) 2020, 2024,  All rights reserved.
 *
 */
package com.by.provider;

import com.by.consumer.OrderKO;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.UUID;

/**
 * <p>Project: spring-boot-rabbitMQ - DirectProvider</p>
 * <p>Powered by scl On 2024-04-07 17:06:41</p>
 * <p>描述:<p>
 *
 * @author 孙臣龙 [1846080280@qq.com]
 * @version 1.0
 * @since 17
 */
@Service
public class ReliabilityProvider implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    //启动一个生产者
    public void send(OrderKO orderKO) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("callbackSender UUID: " + correlationData.getId());
        rabbitTemplate.convertAndSend(
                "Re_E01",
                "RK01",orderKO,
                m-> m,correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败");
        }
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("消息丢失");
    }
}

4.3配置文件

spring.rabbitmq.publisher-confirm-type = CORRELATED
spring.rabbitmq.publisher-returns = true

4.4测试

 @Test
    void test6() throws InterruptedException, IOException {
        for (int i = 1; i <= 5; i++) {
            OrderKO orderKO = OrderKO.builder().id(i).name("孙臣龙" + i).build();
            System.out.println("发送消息"+i);
            reliabilityProvider.send(orderKO);
        }
        Thread.sleep(10000);
        //System.in.read();
    }

<think>好的,用户现在问的是如何保证RabbitMQ消息的可靠传输。之前他们问过如何避免重复消费,而这次的问题更侧重于消息传输的可靠性。我需要回忆一下之前回答的结构,确保这次回答同样详细且有结构。 首先,我需要确定用户可能的使用场景。他们可能在开发一个分布式系统,使用RabbitMQ作为消息中间件。可靠传输是确保消息从生产者到消费者都不丢失的关键,特别是在金融、电商等对数据一致性要求高的领域。用户可能是开发者或架构师,需要确保系统的高可靠性。 接下来,用户的需求是什么?他们想知道如何配置RabbitMQ来防止消息丢失,可能涉及生产者的确认机制、队列持久化、消费者的手动确认等。用户可能已经遇到消息丢失的问题,或者正在设计系统,希望提前规避风险。 深层需求方面,用户可能希望了解全面的解决方案,包括生产端、Broker端和消费端的策略,以及监控和灾备措施。他们可能还需要实际配置示例,比如代码片段或配置参数,以便直接应用。 回想之前的回答结构,用户可能习惯分点说明,每个部分有原理和实现方式。我需要保持类似的格式,确保清晰易读。同时,可能需要对比不同确认模式(如事务、Confirm模式)的优缺点,帮助用户选择适合的方案。 需要注意RabbitMQ的特性,比如持久化需要同时设置队列和消息的属性,手动ACK的重要性,以及如何处理网络故障或服务器崩溃的情况。此外,高可用性方面,镜像队列和集群部署可能也是用户关心的点。 在组织内容时,可以分三个主要部分:生产者、Broker、消费者,每个部分详细说明如何确保可靠传输。最后补充高可用和监控措施,确保全面覆盖。 可能用户对某些术语不熟悉,比如Publisher Confirm,需要简要解释。同时,给出配置示例或代码片段会更有帮助,比如Java或Spring Boot的配置,但用户可能使用不同的语言,所以保持示例通用或注明语言。 还要注意之前回答中的注意事项部分,比如性能权衡,确保用户了解可靠性和性能之间的平衡,比如持久化会增加开销,手动ACK可能影响吞吐量,但这是必要的代价。 最后,确保回答结构清晰,重点突出,使用用户容易理解的语言,避免过于技术化但又不失专业性。可能还需要提及一些常见错误,比如忘记设置持久化或错误处理ACK,导致消息丢失的情况。</think>在 RabbitMQ保证消息的可靠传输,需要从 **生产者、Broker、消费者** 三个环节协同设计,以下是核心方案: --- ### **一、生产者端:确保消息成功投递** #### 1. **事务机制(不推荐高性能场景)** ```java channel.txSelect(); // 开启事务 try { channel.basicPublish(exchange, routingKey, props, body); channel.txCommit(); // 提交事务 } catch (Exception e) { channel.txRollback(); // 回滚 } ``` - **缺点**:同步阻塞,吞吐量下降约 2~10 倍。 #### 2. **Publisher Confirm 模式(推荐)** - **异步确认**:生产者发送消息后,Broker 异步返回确认信号(ACK/NACK)。 - **实现方式**: ```java channel.confirmSelect(); // 开启 Confirm 模式 channel.addConfirmListener((sequenceNumber, multiple) -> { // ACK 回调 // 消息成功持久化到磁盘(若配置) }, (sequenceNumber, multiple) -> { // NACK 回调 // 消息投递失败,需重试或记录 }); ``` - **优势**:非阻塞,支持批量确认(`multiple=true`)。 --- ### **二、Broker 端:防止消息丢失** #### 1. **队列持久化** ```java // 声明队列时设置 durable=true channel.queueDeclare("order_queue", true, false, false, null); ``` - **要求**:队列元数据(名称、属性)持久化到磁盘。 #### 2. **消息持久化** ```java AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .deliveryMode(2) // 1=非持久,2=持久 .build(); channel.basicPublish(exchange, routingKey, props, body); ``` - **注意**:需同时设置队列和消息为持久化,否则重启后消息仍可能丢失。 #### 3. **镜像队列(高可用)** ```shell # 配置策略,镜像到所有节点 rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}' ``` - **作用**:消息在集群多个节点间复制,避免单点故障。 --- ### **三、消费者端:确保消息正确处理** #### 1. **手动 ACK 确认** ```java channel.basicConsume(queue, false, (consumerTag, delivery) -> { // autoAck=false try { processMessage(delivery.getBody()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 手动ACK } catch (Exception e) { // 处理失败:NACK+重入队列 或 记录死信 channel.basicNack(deliveryTag, false, true); } }, consumerTag -> {}); ``` - **关键点**: - 关闭 `autoAck`(避免消息未处理完即丢失)。 - ACK 应在业务逻辑成功**后**发送。 - 失败时选择 `basicNack(requeue=true)` 或转入死信队列。 #### 2. **消费端重试机制** - **本地重试**:捕获异常后重试(需限制最大次数)。 - **死信队列(DLX)**:超过重试次数后转发到 DLX,人工干预处理。 ```java Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); channel.queueDeclare("order_queue", true, false, false, args); ``` --- ### **四、补充保障措施** #### 1. **网络断连自动恢复** ```java ConnectionFactory factory = new ConnectionFactory(); factory.setAutomaticRecoveryEnabled(true); // 自动重连 factory.setNetworkRecoveryInterval(5000); // 重试间隔 ``` #### 2. **消息落库 + 状态追踪** - 生产者在发送前将消息写入本地数据库,标记状态为「发送中」。 - 通过定时任务补偿未确认的消息(类似 MQTT 的 QoS 2 机制)。 #### 3. **监控报警** - 监控指标:未确认消息数、队列堆积量、节点健康状态。 - 工具:Prometheus + Grafana、RabbitMQ Management Plugin。 --- ### **可靠性保障全链路总结** | 环节 | 技术方案 | 目标 | |----------|-----------------------------------|--------------------------| | 生产者 | Confirm 模式 + 消息持久化 | 确保消息到达 Broker | | Broker | 队列持久化 + 镜像队列 | 防止消息丢失和单点故障 | | 消费者 | 手动 ACK + 死信队列 + 重试 | 保证消息最终被消费 | | 兜底措施 | 本地消息表 + 监控报警 | 业务层补偿和快速故障发现 | --- ### **注意事项** 1. **性能权衡**:持久化和 Confirm 模式会增加延迟,需根据业务容忍度调整。 2. **幂等性**:重试机制可能导致重复消费,需结合[消息去重方案](链接)实现。 3. **脑裂问题**:镜像队列在集群网络分区时可能产生数据不一致,需配置 `pause_minority` 策略。 通过以上多级保障,可达到 **99.99%+** 的消息可靠性(实际取决于硬件和网络稳定性)。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值