目录
消息何去何从
mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有 当消息传递过程中不可达目的地时将消息返回给生产者的功能。RabbitMQ提供的备份交换器 (Alternate Exchange)可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存 储起来,而不用返回给客户端。
对于初学者来说,特别容易将mandatory和immediate这两个参数混淆,而对于备份 交换器更是一筹莫展,
mandatory参数
当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件 的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参 数设置为false时,出现上述情形,则消息直接被丢弃。
那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener来添加ReturnListener监听器实现。
使用mandatory参数的关键代码如代码清单所示。
上面代码中生产者没有成功地将消息路由到队列,此时RabbitMQ会通过Basic.Return 返回"mandatory test"这条消息,之后生产者客户端通过:ReturnListener监听到了这个事 件,上面代码的最后输出应该是"Basic.Return返回的结果是: mandatory test"。
从AMQP协议层面来说,其对应的流转过程如图所示。
immediate参数
当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在 任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时, 该消息会通过Basic.Return返回至生产者。
概括来说,mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返 回给生产者。
immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递; 如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等 待消费者了。
RabbitMQ 3.0版本开始去掉了对immediate参数的支待,对此RabbitMQ官方解释是:
immediate参数会影响镜像队列的性能,增加了代码复杂性,建议采用TTL和DLX的方法替代。
备份交换器
备份交换器,英文名称为Alternate Exchange, 简称AE, 或者更直白地称之为"备胎交换器”。
生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失; 如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将 变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器, 这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。
可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加 alternate-exchange参数来实现,也可以通过策略的方式实现。 如果两者同时使用,则前者的优先级更高,会覆盖掉Policy的设置。
使用参数设置的关键代码如代码清单所示。
上面的代码中声明了两个交换器normalExchange和myAe, 分别绑定了normal Queue和 unroutedQueue这两个队列,同时将myAe设置为normalExchange的备份交换器。注意myAe 的交换器类型为fanout。
参考图, 如果此时发送一条消息到normalExchange上,当路由键等于"normal.Key"的 时候,消息能正确路由到normal Queue这个队列中。如果路由键设为其他值,比如"error Key", 即消息不能被正确地路由到与normalExchange绑定的任何队列上,此时就会发送给myAe, 进 而发送到unroutedQueue这个队列。
备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型, 如若想设置为direct或者topic的类型也没有什么不妥。需要注意的是,消息被重新发送到 备份交换器时的路由键和从生产者发出的路由键是一样的。
考虑这样一种情况,如果备份交换器的类型是direct, 并且有一个与其绑定的队列,假设绑 定的路由键是key1, 当某条携带路由键为key2的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为key1, 则可以存储到队 列中。
对于备份交换器,总结了以下几种特殊情况:
如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消 息会丢失。
如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此 时消息会丢失。
如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现, 此时消息会丢失。
如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
过期时间(TTL)
TTL, Time to Live的简称,即过期时间。RabbitMQ可以对消息和队列设置TTL。
设置消息的TTL
目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息 都有相同的过期时间。
第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如 果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。
消息在队列中的生存时 间一旦超过设置的TTL值时,就会变成“死信" (Dead Message), 消费者将无法再收到该消息 (这点不是绝对的)。
通过队列属性设置消息TTL的方法是在channel.queueDeclare方法中加入 x-message-ttl参数实现的,这个参数的单位是毫秒。
示例代码如代码清单所示。
如果不设置TTL, 则表示此消息不会过期;
如果将TTL设置为0, 则表示除非此时可以直 接将消息投递到消费者,否则该消息会被立即丢弃,这个特性可以部分替代RabbitMQ 3.0版本 之前的immediate参数,之所以部分代替,是因为immediate参数在投递失败时会用 Basic.Return将消息返回(这个功能可以用死信队列来实现)。
针对每条消息设置TTL的方法是在channel.basicPublish方法中加入expiration 的属性参数,单位为毫秒。
关键代码如代码清单所示。
或者
还可以通过HTTP API接口设置:
对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方 法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费 者之前判定的。
为什么这两种方法处理的方式不一样?因为第一种方法里,队列中已过期的消息肯定在队 列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。
而第二种方法里,每条消 息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将 被消费时再判定是否过期,如果过期再进行删除即可。
设置队列的TTL
通过channel.queueDeclare方法中的x-expires参数可以控制队列被自动删除前,处 于未使用状态的时间。
未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并 且在过期时间段内也未调用过Basic.Get命令。
设置队列里的TTL可以应用于类似RPC方式的回复队列,在RPC中,许多队列会被创建 出来,但是却是未被使用的。
RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在 RabbitMQ重启后,持久化的队列的过期时间会被重新计算。
用于表示过期时间的x-expires参数以亳秒为单位,并且服从和x-message-ttl一样 的约束条件,不过不能设置为0。比如该参数设置为1000, 则表示该队列如果在1秒钟之内未 使用则会被删除。
代码清单演示了创建一个过期时间为30分钟的队列:
死信队列
DLX, 全称为Dead-Letter-Exchange, 可以称之为死信交换器,也有人称之为死信邮箱。当 消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个 交换器就是DLX, 绑定DLX的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
消息被拒绝(Basic.Reject/Basic. Nack), 并且设置requeue参数为false; 令消息过期;
队列达到最大长度。
DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实 际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重 新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。