1.什么是MQ
MQ就是消息队列。是软件和软件进行通信的中间件产品,队列里存放的是message,是一种跨进程的通信机制,用于上下游传递信息
2.MQ解决的问题
流量削峰,减少高峰时期对服务器压力,将超出的请求可以到信息队列,处理时间会变长
应用解耦,A将BC都需要的数据发送到MQ,BC就不需要找A了,到MQ里面消费即可,A不需要在考虑给谁发了,也不需要在维护这个数据了
异步处理
A调用B,B花费的时间很长,但是A需要知道B什么时候执行完
使用MQ,当A 调用 B 服务后,只需要监听 B 处理完成的消息
当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务
3.什么是RabbitMQ
- 2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一
- 优点:
- 由于 erlang 语言的高并发特性,性能较好;吞吐量到万级
- MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:
- Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等
- 支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
- 缺点:
- 商业版需要收费,学习成本较高
RabbitMQ的七大工作模式
简单模式: 一个生产者,一个消费者
work模式: 一个生产者,多个消费者,每个消费者获取到的消息唯一
订阅模式: 一个生产者发送的消息会被多个消费者获取
交换机会将消息发送到所有和它进行绑定的队列上,广播,群发
路由模式: 发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
直接交换机通过消息上的路由键直接对消息进行分发,相当于精确匹配,一对一
topic模式: 将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,
“#”匹配一个词或多个词,“*”只匹配一个词
这个交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
发送到主题交换的消息不能有任意的 routing_key - 它必须是单词列表,由点分隔。
这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中可以有任意多的单词,最多为 255 个字节
- *(星号)可以只替换一个单词。
- # (hash) 可以代替零个或多个单词
- 绑定的时候出现情况
- 当一个队列绑定的是#,那么这个队列是不是接收了所有的消息,这个时候跟fanout有点像
- 当一个队列绑定的既没有#,也没有*,那就是一个direct
RPC模式: 使用RabbitMQ构建RPC系统:客户端和可伸缩RPC服务器
发布确认: 与发布者进行可靠的发布确认
单个确认 同步
批量确认 同步
异步确认
AMQP核心概念
AMQP: AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
虚拟主机(virtual host)或(vhost)
一组交换机、队列和绑定器被称为 虚拟主机(vhost)
一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
每一个RabbitMQ服务器都有一个默认的虚拟主机
交换机(exchange)
消息通常是发送到交换机
它指定消息按什么规则,路由到哪个队列。它可以被理解成具有路由表的路由程序。(发送消息的实体)
按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。
交换机可以存在多个,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程
Exchange的类型
- Direct:直接交换机通过消息上的路由键直接对消息进行分发,相当于精确匹配,一对一
- Topic:这个交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
- Fanout:交换机会将消息发送到所有和它进行绑定的队列上,广播,群发
- Headers:消息头交换机使用消息头的属性进行消息路由,相当于模糊匹配(like header%),一对多
队列(queue)
消息队列,用来保存消息,供消费者消费。
队列是消息载体,每个消息都会被投入到一个或多个队列
试图创建一个已经存在的队列,RabbitMQ会直接忽略这个请求(接收消息的实体)。
绑定器(bind)
交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
作用:把exchange和queue按照路由规则绑定起来
将交换器和队列连接起来,并且封装消息的路由信息
连接(Connection)
连接,应用程序与Server的网络连接,TCP连接
与RabbitMQ Server建立的一个连接
由ConnectionFactory创建
每个connection只与一个物理的Server进行连接,此连接是基于Socket进行连接的
AMQP一般使用TCP
通道 (Channel)
消息通道(主要进行相关定义,发送消息,获取消息,事务处理等)
消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
消息队列持久化
创建一个队列的时候,可以是非持久化的,也可以是持久化的
非持久化:rabbitmq如果重启,该队列就会被删除
持久化:重启不影响
消息持久化必须要消息队列持久化
ACK
默认情况下,消费者接收到消息的时候(但是通常接到消息就处理了),就会进行自动应答(ACK)
如果一个消费者处理消息时间长,或者异常了,所以需要手动Ack
自动ACK
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// true 表示自动ack:autoAck
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
手动ACK
// 1.channel.basicConsume 将true改为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
// 2.在finally 中手动Ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
消息丢失的方式
发送方发送到消息队列的时候,丢了
交换机到队列中,丢了
队列到消费者,丢了,目前所学的知识点,队列中还有消息,再次发送
rabbitmq 怎么避免消息丢失?
消息持久化
ACK确认机制
设置集群镜像模式
消息补偿机制