RabbitMQ是怎么保证消息的可靠性的?

 消息发送到RabbitMQ,然后被消费端消费,在这个过程中,RabbitMQ是在怎么保证消息不丢失的?

1.发送方

(1)RabbitMQ 事物

 事务,保证Product发送消息的业务操作完全正确执行后,发送到RabbitMQ的数据才是有效的。

channel.txSelect() 用于将当前信道设置成事务模式;
channel.txCommit() 用于提交事务;
channel.txRollback() 用于事务回滚。

(2)发送方确认机制

 AMQP协议层提供了事务机制来保障生产者发送的消息真正达到了RabbitMQ,但采用事务机制实现将严重影响RabbitMQ的消息吞吐量,RabbitMQ引入了另一种轻量级方式——发送方确认(publisher confirm)机制。

实现:
 生产者调用channel.confirmSelect()方法,也就是Confirm.Select命令,将信道设置为发送方确认模式。

boolean waitForConfirms() throws InterruptedException;

boolean waitForConfirms(long timeout) 
    throws InterruptedException,TimeoutException;

void waitForConfirmsOrDie() throws IOException, InterruptedException;

void waitForConfirmsOrDie(long timeout) 
    throws IOException, InterruptedException,TimeoutException;
①单条确认机制

代码实现:

//开启发送方确认机制
channel.confirmSelect();
for (int i = 0; i < 100; i++) {
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());
    // 逐条确认是否发送成功
    if(channel.waitForConfirms()){ 
        System.out.println("send success!");
    }
}
②批量confirm方式

代码实现:

//开启发送方确认机制
channel.confirmSelect();
for (int i = 0; i < 100; i++) {
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY ,
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());
}
// 批量确认是否发送成功 如果某一次确认失败 这一批都要重新发送
if(channel.waitForConfirms()){
    System.out.println("send success!");
}
③异步confirm方式

代码实现:

//开启发送方确认机制
channel.confirmSelect();
//定义一个未确认消息集合
final SortedSet<Long> unConfirmSet =
Collections.synchronizedNavigableSet(new TreeSet<Long>());
//添加消息确认监听器
channel.addConfirmListener(new ConfirmListener() {
    //拒绝消息
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("拒绝消息 deliveryTag:"+deliveryTag+" multiple:"+multiple);
    }
    //确认消息
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("确认消息 deliveryTag:"+deliveryTag+" multiple:"+multiple);
        if(multiple){
            //multiple为true,则deliveryTag之前的所有u消息全部被确认
            unConfirmSet.headSet(deliveryTag+1).clear();
        }else{
            //否则只确认一条消息
            unConfirmSet.remove(deliveryTag);
        }
    }
});
for (int i = 0; i < 100; i++) {
    //得到消息的deliveryTag
    long deliveryTag = channel.getNextPublishSeqNo(); 
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY ,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());
    unConfirmSet.add(deliveryTag);
}

(3)备份交换器、备份队列

消息路由失败会怎样:
 在发送消息channel.basicPublish()方法中有两个参数分别是mandatory和immediate,它们都有当消息在传递过程中不可达目的地时,将消息返回给生产者的功能。RabbitMQ提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。

 备份交换器可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。
代码实现:

Map<String, Object> args = new HashMap<String, Object>();
//为交换器绑定备份交换器
args.put("alternate-exchange", EXCHANGE_NAME_BAK); 
//创建一个交换器 设置备份交换器
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,false, args);
channel.exchangeDeclare(EXCHANGE_NAME_BAK,"fanout",false,false,false,null);
//创建队列
channel.queueDeclare(QUEUE_NAME, true, false , false , null);
channel.queueDeclare(QUEUE_NAME_BAK, true, false , false , null);
//将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME , EXCHANGE_NAME , ROUTING_KEY);
channel.queueBind(QUEUE_NAME_BAK , EXCHANGE_NAME_BAK , "");
String message = "Hello Queue.";
//消息RoutingKey与交换器绑定路由的BindingKey不匹配,消息路由失败。
channel.basicPublish(EXCHANGE_NAME, "123", true ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());

关于备份交换器:

  1. 如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会出现异常,此时消息会丢失。
  2. 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会出现异常,此时消息会丢失。
  3. 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会出现异常,此时消息会丢失。
  4. 如果备份交换器和mandatory参数一起使用,如果备份交换器有效,那么mandatory参数无效 。

2.服务节点broker

(1)队列和消息的持久化

 消息保存在服务节点上,为了防止消息丢失,可以将消息持久化到磁盘,即:设置消息为可持久化。

注意:消息持久化的前提是,队列要支持持久化。

队列持久化:

Queue.DeclareOk queueDeclare() throws IOException;

Queue.DeclareOk queueDeclare(String queue, boolean durable,
    boolean exclusive, boolean autoDelete, 
    Map<String,Object> arquments) throws IOException;

在这里插入图片描述

交换器持久化:

Exchange.DeclareOk  channel.exchangeDeclare(String exchange, 
    String type, boolean durable,boolean autoDelete, boolean internal, 
    Map<String, Object> arquments) throws IOException;

在这里插入图片描述

消息持久化:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); //持久化消息
builder.expiration("10000"); //设置消息超时时间 ,单位:ms

(2)消息的过期

死信交换器、死信队列:
 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

消息变成死信一般是由于以下几种情况:

  1. 消息被拒绝(Basic.Reject/Basic.Nack),井且设置requeue参数为false;
  2. 消息过期;(队列过期被删除不属于死信)
  3. 队列达到最大长度。
     DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定。

3.接收方

 消费消息有两种模式:推basicConsume() /拉basicGet()。接受方接收消息,可采用手动确认的方式,来确认消息是否被消费者获取成功。

 客户端消费消息时,可设置autoAck参数,当autoAck等于true时, RabbitMQ会自动把发送出去的消息改为确认,然后移除,而不管消息是否真正被消费者接收到。当autoAck等于false时,客户端需要调用basicAck()方法确认消费。

void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • multiple为true时,确认消费该信道上deliveryTag之前的所有未确认的消息。
  • multiple为false时,确认消费该信道上当前的消息。

 如果RabbitMQ一直没有收到消费者的确认信号,并且该消费者已经断开连接,则RabbitMQ会安排消息重新进入队列(进入到队列头部,消息顺序不变,在没有真正被删除前,消息的位置不会改变),等待投递给下一个消费者(可以是原消费者)。

(1)推模式

服务器主动推送给消费者

String basicConsume(String queue, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, Consumer callback) 
    throws IOException;

String basicConsume(String queue, boolean autoAck, 
    Map<String, Object> arguments, Consumer callback) throws IOException;

String basicConsume(String queue, boolean autoAck, 
    String consumerTaq, Consumercallback) throws IOException;

String basicConsume(String queue, boolean autoAck, String consumerTag, 
    boolean noLocal, boolean exclusive, 
    Map<String, Object> arguments, Consumer callback) throws IOException;

其中一些参数具体作用:
autoAck:是否自动确认, false表示不自动确认;
consumerTag:消费者标签,用来区分多个消费者;
noLocal:设置为true则表示不能将同一个Connection中生产者的消息传送给这个Connection中的消费者
exclusive:设置是否排他
arguments:设置消费的其他参数;
callback:设置消息的回调函数。用来处理RabbitMQ推送过来的消息

注意:RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。

(2)拉模式

 消费者主动从服务器获取消息,basicGet()单条获取消息,但basicGet不能用于for循环中代替basicConsume。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值