消息发送到RabbitMQ,然后被消费端消费,在这个过程中,RabbitMQ是在怎么保证消息不丢失的?
1.发送方
(1)RabbitMQ 事物
事务,保证Product发送消息的业务操作完全正确执行后,发送到RabbitMQ的数据才是有效的。
channel.txSelect() 用于将当前信道设置成事务模式;
channel.txCommit() 用于提交事务;
channel.txRollback() 用于事务回滚。
(2)发送方确认机制
AMQP协议层提供了事务机制来保障生产者发送的消息真正达到了RabbitMQ,但采用事务机制实现将严重影响RabbitMQ的消息吞吐量,RabbitMQ引入了另一种轻量级方式——发送方确认(publisher confirm)机制。
实现:
生产者调用channel.confirmSelect()方法,也就是Confirm.Select命令,将信道设置为发送方确认模式。
boolean waitForConfirms() throws InterruptedException;
boolean waitForConfirms(long timeout)
throws InterruptedException,TimeoutException;
void waitForConfirmsOrDie() throws IOException, InterruptedException;
void waitForConfirmsOrDie(long timeout)
throws IOException, InterruptedException,TimeoutException;
①单条确认机制
代码实现:
//开启发送方确认机制
channel.confirmSelect();
for (int i = 0; i < 100; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 逐条确认是否发送成功
if(channel.waitForConfirms()){
System.out.println("send success!");
}
}
②批量confirm方式
代码实现:
//开启发送方确认机制
channel.confirmSelect();
for (int i = 0; i < 100; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
}
// 批量确认是否发送成功 如果某一次确认失败 这一批都要重新发送
if(channel.waitForConfirms()){
System.out.println("send success!");
}
③异步confirm方式
代码实现:
//开启发送方确认机制
channel.confirmSelect();
//定义一个未确认消息集合
final SortedSet<Long> unConfirmSet =
Collections.synchronizedNavigableSet(new TreeSet<Long>());
//添加消息确认监听器
channel.addConfirmListener(new ConfirmListener() {
//拒绝消息
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("拒绝消息 deliveryTag:"+deliveryTag+" multiple:"+multiple);
}
//确认消息
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("确认消息 deliveryTag:"+deliveryTag+" multiple:"+multiple);
if(multiple){
//multiple为true,则deliveryTag之前的所有u消息全部被确认
unConfirmSet.headSet(deliveryTag+1).clear();
}else{
//否则只确认一条消息
unConfirmSet.remove(deliveryTag);
}
}
});
for (int i = 0; i < 100; i++) {
//得到消息的deliveryTag
long deliveryTag = channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
unConfirmSet.add(deliveryTag);
}
(3)备份交换器、备份队列
消息路由失败会怎样:
在发送消息channel.basicPublish()方法中有两个参数分别是mandatory和immediate,它们都有当消息在传递过程中不可达目的地时,将消息返回给生产者的功能。RabbitMQ提供的备份交换器(Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。
备份交换器可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。
代码实现:
Map<String, Object> args = new HashMap<String, Object>();
//为交换器绑定备份交换器
args.put("alternate-exchange", EXCHANGE_NAME_BAK);
//创建一个交换器 设置备份交换器
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,false, args);
channel.exchangeDeclare(EXCHANGE_NAME_BAK,"fanout",false,false,false,null);
//创建队列
channel.queueDeclare(QUEUE_NAME, true, false , false , null);
channel.queueDeclare(QUEUE_NAME_BAK, true, false , false , null);
//将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME , EXCHANGE_NAME , ROUTING_KEY);
channel.queueBind(QUEUE_NAME_BAK , EXCHANGE_NAME_BAK , "");
String message = "Hello Queue.";
//消息RoutingKey与交换器绑定路由的BindingKey不匹配,消息路由失败。
channel.basicPublish(EXCHANGE_NAME, "123", true ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
关于备份交换器:
- 如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会出现异常,此时消息会丢失。
- 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会出现异常,此时消息会丢失。
- 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会出现异常,此时消息会丢失。
- 如果备份交换器和mandatory参数一起使用,如果备份交换器有效,那么mandatory参数无效 。
2.服务节点broker
(1)队列和消息的持久化
消息保存在服务节点上,为了防止消息丢失,可以将消息持久化到磁盘,即:设置消息为可持久化。
注意:消息持久化的前提是,队列要支持持久化。
队列持久化:
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable,
boolean exclusive, boolean autoDelete,
Map<String,Object> arquments) throws IOException;
交换器持久化:
Exchange.DeclareOk channel.exchangeDeclare(String exchange,
String type, boolean durable,boolean autoDelete, boolean internal,
Map<String, Object> arquments) throws IOException;
消息持久化:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2); //持久化消息
builder.expiration("10000"); //设置消息超时时间 ,单位:ms
(2)消息的过期
死信交换器、死信队列:
DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(deadmessage)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
- 消息被拒绝(Basic.Reject/Basic.Nack),井且设置requeue参数为false;
- 消息过期;(队列过期被删除不属于死信)
- 队列达到最大长度。
DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定。
3.接收方
消费消息有两种模式:推basicConsume() /拉basicGet()。接受方接收消息,可采用手动确认的方式,来确认消息是否被消费者获取成功。
客户端消费消息时,可设置autoAck参数,当autoAck等于true时, RabbitMQ会自动把发送出去的消息改为确认,然后移除,而不管消息是否真正被消费者接收到。当autoAck等于false时,客户端需要调用basicAck()方法确认消费。
void basicAck(long deliveryTag, boolean multiple) throws IOException;
- multiple为true时,确认消费该信道上deliveryTag之前的所有未确认的消息。
- multiple为false时,确认消费该信道上当前的消息。
如果RabbitMQ一直没有收到消费者的确认信号,并且该消费者已经断开连接,则RabbitMQ会安排消息重新进入队列(进入到队列头部,消息顺序不变,在没有真正被删除前,消息的位置不会改变),等待投递给下一个消费者(可以是原消费者)。
(1)推模式
服务器主动推送给消费者
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback)
throws IOException;
String basicConsume(String queue, boolean autoAck,
Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck,
String consumerTaq, Consumercallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive,
Map<String, Object> arguments, Consumer callback) throws IOException;
其中一些参数具体作用:
autoAck:是否自动确认, false表示不自动确认;
consumerTag:消费者标签,用来区分多个消费者;
noLocal:设置为true则表示不能将同一个Connection中生产者的消息传送给这个Connection中的消费者
exclusive:设置是否排他
arguments:设置消费的其他参数;
callback:设置消息的回调函数。用来处理RabbitMQ推送过来的消息
注意:RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
(2)拉模式
消费者主动从服务器获取消息,basicGet()单条获取消息,但basicGet不能用于for循环中代替basicConsume。