消息何去何从
mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有消息传递过程中不可达目的地时将消息返回给生产者的功能。
mandatory参数
当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么多RabbitMQ会调用 Basic.Return命令将消息返回给生产者。
当mandatory参数为false时,出现上述情形,则消息直接被丢弃。
channel.basicPublish(EXCHANGE NAME , "", true,
MessageProperties . PERSISTENT TEXT PLAIN ,
"mandatory test" . getBytes());
channel.addReturnListener(new ReturnListener() (
public void handleReturn(int replyCode , String replyText ,
String exchange , String routingKey,
AMQP.BasicProperties basicProperties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println( "Basic.Return 返回的结果是: "+message ); }
}
});
上述代码中生产者没有成功地将消息路由到队列,些时RabbitMQ会通过Basic.Return返回"mandatory test"这条消息,之后生产者客户端通过ReturnListener监听到这个事件,上面代码的最后输出应该是"Basic.Return"返回的结果是:"mandatory test"
immediate参数
当immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。
备份交换器(Alternate Exchange)
生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂。
Map<String, Object> args = new HashMap<String , Object>();
args.put("a1ternate-exchange" , "myAe");
channel.exchangeDec1are( "normalExchange" , "direct" , true , false , args);
channel . exchangeDec1are( "myAe " , "fanout" , true , false , null) ;
channel .queueDec1are( "normalQueue " , true , false , false , null);
channel .queueBind( normalQueue "normalExchange" , " normalKey");
channel .queueDec1are( "unroutedQueue " , true , false , false , null);
channel.queueBind("unroutedQueue","myAe","");
备份交换器,几种特殊情况:
1)如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失;
2)如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失;
3)如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
4)如果备份交换器和mandatory参数一起使用那么mandatory参数无效。
过期时间TTL
Map<String, Object> argss = new HashMap<String , Object>();
argss.put("x-message-ttl " , 6000);
channel . queueDeclare(queueName , durable , exclusive , autoDelete , argss) ;
死信队列
DLX Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为列信邮箱。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
1)消息被拒绝,并且设置requeue参数为false
2)消息过期;
3)队列达到最大长度;
延迟队列
延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是批当消息被发送以后,并不想让消息者立刻拿到消息,而是等待待定时间后,消费者才能拿到这个消息进行消费。
应用场景:
1)在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了。
2)用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间一了再将指令推送到智能设备。
优先级队列
具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
AMQP .BasicProperties .Bui1der builder = new AMQP . BasicProperties. Builder() ;
builder . priority(5) ;
AMQP . BasicProperties properties = bu lder . build () ;
channel.basicPub1ish( " exchange_priority" , " rk_priority" , properties, ("message" ) . getBytes () ) ;
RPC实现
Remote Procedure Call 即远程过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术。RPC的主要功能是让构建分布式计算更容易在提供强大的远程调用能力时不损失本地调用的语义简洁性。
String callbackQueueName = channel.queueDeclare() . getQueue();
BasicProperties props = new
BasicProperties .Bu lder() . replyTo(callbackQueueName) . build();
channel.bas cPubl sh( "" ," rpc queue " , props, message.getBytes()) ;
RPCServer:
public class RPCServer {
private static final String RPC_QUEUE_NAME = " rpc_ queue ";
public static void main(String args[]) throws Exception {
channel . queueDeclare(RPC_QUEUE_ NAME , false , false , false , null) ;
channel . basicQos (1) ;
System . out.println( " [x] Awaiting RPC requests " );
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope ,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP . BasicProperties replyProps = new AMQP.BasicProperties.Builder ()
.correlationId(properties.getCorrelationId())
.build () ;
String response = "";
try {
String message = new String(body, "UTF-8 " ) ;
int n= Integer.parselnt(message) ;
System.out . println( " [ . ] fib( " + message + " ) " );
response += fib(n) ;
} catch (RuntimeExcept on e) {
System.out . println( " [.] " + e . toString()) ;
) finally {
channel.basicPubl sh(" properties.getReplyTo() ,
replyProps, response.getBytes( " UTF-8"));
channel . basicAck(envelope.getDeliveryTag() , false);
channel . basicConsume(RPC QUEUE NAME , false , consumer) ;
private static int fib(int n) {
if (n == 0) return 0 ;
if(n == 1) return 1 ;
return fib(n - 1) + b(n - 2) ;
}
}
RPCClient:
public class RPCClient {
private Connection connection ;
private Channel channel ;
private String requestQueueName = " rpc queue ";
private String replyQueueName ;
private QueueingConsumer consumer;
public RPCClient() throws IOException , TimeoutException {
replyQueueName = channel . queueDeclare () . getQueue () ;
consumer = new QueueingConsumer (c h annel );
channel.basicConsume(replyQueueName , true , consumer) ;
public String call (String message ) throws IOException ,
ShutdownSignalException, ConsumerCancelledException ,
InterruptedException {
String response = null ;
String corrld = UUID . randomUUID () . toString ();
BasicProperties props = new BasicPrope r ties . Bui lder ()
. correlationld (corrld)
.replyTo(replyQueueName)
.build () ;
channel.basicPublish ("" , request Que ueName , prop s , message .getBytes ());
while (true) {
QueueingConsumer . Delivery delivery = consumer . nextDelivery () ;
if(delivery . getProperties () . getCorrelationld () .equals(corrld) ) {
response = new String de very getBody ()) ;
break;
}
}
return response ;
}
public voild close() throws Exception{
connection . close();
}
public static void main(String args[)) throws Exception{
RPCClient fibRpc = new RPCClient() ;
System . out .println(" [x) Requesting fib(30)");
String response = fibRpc . call( " 30 ");
System out .p ri tln (" [.) Got '"+response+"'" );
fibRpc.close() ;
}
}
持久化
交换器的持久化、队列的持久化和消息的持久化。
public static final BasicProperties PERSISTENT TEXT PLAIN =
new BasicProperties( " text/plain" ,
null ,
null ,
2 , //deliveryMode
0,null, null , null ,
null , null , null , null ,
null , null) ;
生产者确认
在使用RabbitMQ的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致 的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。
两种解决方式:
1)通过事务机制实现
channel . txSelect();
channel.basicPublish(EXCHANGE NAME , ROUTING KEY ,
MessageProperties . PERSISTENT TEXT_PLAIN,
"transaction messages".getBytes());
channel . txCommit();
事务回滚:
try (
channel.txSelect() ;
channel.basicPublish(exchange , rout 工口 gKey
MessageProperties . PERSISTENT TEXT PLAIN , msg.getBytes());
int result = 1 / 0 ;
channel . txCommit() ;
) catch (Exception e) (
e.printStackTrace();
channel . txRollback();
}
2)通过发送方确认机制实现
try {
channel.confirmSelect() // 将信道置为 publisher confirm 模式
//之后正常发送消息
channel . basicPublish( "exchange " , " routingKey" , null ,
"publisher confirm test " .getBytes());
if (!channel . waitForConfirms()) {
System.out . println( " send message failed" ) ;
// do something else..
} catch (InterruptedException e) (
e.printStackTrace() ;
}