# 消息队列
![](./images/message_queue.png)
## 什么是消息队列
消息队列(Message Queue)是一种进程间通信或同一进程的不同线程间的通信方式。
比如Go里面的channel就是一种典型的消息队列, 他用于不同的goroutine进行通信
```go
func main() {
ch := make(chan, 5)
// 生产消息(子Gorouine)
go func() {
for i:=0;i<10;i++ {
ch <- i
}
}()
// 消费消息(主Goroutine)
for v := range ch {
fmt.Println(v)
}
}
```
这种语言内置的消息队列数据结构, 只能用于进程内通信, 一旦跨进程通信就无法使用
## 消息服务器
如果我们把消息队列这种数据结构 做成一个服务, 让他监听一个端口接收消息,然后把消息转发给其他进程 是不是就实现了 进程间的 基于消息队列的通信。
像上面说的这种 用于接收和转发消息的 服务,往往被叫做: 消息服务器, 或者消息中间件
![](./images/mq_server.webp)
## 使用场景
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然, 基于这个衍生出来很多使用场景:
+ 异步处理:例如短信通知、终端状态推送、App推送、用户注册等
有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
+ 数据同步:业务数据推送同步
比如阿里云有个事件通知服务, 它把会消息放到你指定的队列中, 用于事件通知
+ 重试补偿:记账失败重试
创建的失败重试队列就是这种场景的应用
+ 流量消峰:秒杀场景下的下单处理
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
+ 数据流处理:日志服务、监控上报
分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择
+ 系统解耦:通讯上下行、终端异常监控、分布式事件中心
降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
## 核心概念
+ Broker(消息服务器)
Broker的概念来自与Apache ActiveMQ,通俗的讲就是MQ的服务器。
+ Producer(生产者)
业务的发起方,负责生产消息传输给broker
+ Consumer(消费者)
业务的处理方,负责从broker获取消息并进行业务逻辑处理
+ Topic(主题)
发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅 者,实现消息的广播
+ Message(消息体)
根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
## 消息模式
### 点对点模型
点到点模型就是基本的队列模型, 可以将他理解为Go里面的channel, 只是 发送者(Sender)往消息队列(Queue)发送消息, 接收者(Receiver)从Queue中接收消息
![](./images/mq_ptp.png)
需要注意的是一旦被消费,消息就不再在消息队列中
### 发布订阅模型
当你的消息需要多个消费者都能收到时(也就是广播机制), 很显然点对点模式无法满足, 此时就会用到发布订阅模型, 这是最常用到的模型: 多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者
![](./images/mq_pb_sub.png)
由于采用类似于广播的模式, 它有如下特点:
+ 每个消息可以有多个消费者:和点对点方式不同,发布消息可以被所有订阅者消费
+ 发布者和订阅者之间有时间上的依赖性。
+ 针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
+ 为了消费消息,订阅者必须保持运行的状态
## Kafka
Kafka 是 Apache 的子项目,是一个高性能跨语言的分布式发布/订阅消息队列系统(没有严格实现 JMS 规范的点对点模型,但可以实现其效果),在企业开发中有广泛的应用。高性能是其最大优势,劣势是消息的可靠性(丢失或重复),这个劣势是为了换取高性能,开发者可以以稍降低性能,来换取消息的可靠性
![](./images/kafka.png)
### Topic与Partition
一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”
kafka高性能的一个重要原因也在于此, 因为append log是顺序IO
### 数据冗余方案
一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。
基于replicated方案,那么就意味着需要对多个备份进行调度, 整个集群中只有1个leader节点负责人写, follower节点负责同步(leader和follower基于zk实现)
### 消费组
![](./images/con_group.png)
### 如何保证消息的顺序性
为了保证消息的有序性,kafka新增了一个概念: partition, partition中的数据一定是有序的。
生产者在写的时候 ,可以指定一个key,比如指定订单id作为key,这个订单相关数据一定会被分发到一个partition中去。消费者从partition中取出数据的时候也一定是有序的,把每个数据放入对应的一个内存队列,一个partition中有几条相关数据就用几个内存队列,消费者开启多个线程,每个线程处理一个内存队列
### 集群状态维护
比如一个consumer 在处理到1000条消息的时候突然挂了, 如果保证该consumer再次上线时, 能继续从之前下线的位置继续处理.
因此我们需要保存consumer当前的一个消息处理状态, 但kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息由zookeeper保存;
因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响
### 环境准备
这里环境采用Docker composej安装:
创建dock compose编排文件: docker-compose.yml
```yaml
version: '2'
services:
zoo1:
image: wurstmeister/zookeeper
restart: unless-stopped
hostname: zoo1
ports:
- "2181:2181"
container_name: zookeeper
kafka1:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1"
depends_on:
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
课程 Go语言基础 Day01 初识Go语言 开发环境搭建 第一个程序 基础语法 基础类型 变量常量与值 变量作用域 运算符 作业 Day02 MODULES工程 格式化输出 条件语句 读取标准输入 循环语句 数据类型转换 Go语言指针 作业 Day03 Go语言函数 defer与异常 Go语言数组 Go语言结构体 日期与时间 非类型安全指针 Go语言切片 作业 Day04 重点知识回顾与答疑 Day05 字符串 Go语言Map Go语言接口 错误处理 空接口与类型断言 函数式编程 面向对象 面向接口 作业 Day06 IO操作 反射与AST 文件读写 包与工程 单元测试和基准测试 Go语言进阶 Day09 密码学简介 散列算法 对称加密算法 非对称加密算法 密钥交换算法 数据结构之栈 算法的评估 数据结构之链表 数据结构之堆 作业 Day10 并发调度模型与Go GPM调度 Channel的使用 Channel与Select 锁和原子操作 CSP并发设计模式 并发注意事项 作业 Day11 互联网协议介绍 TCP编程 UDP编程 实战: Socket代理 HTTP与WebSocket
资源推荐
资源详情
资源评论
收起资源包目录
Go 语言课程和项目源码 (1091个子文件)
stdin_scan.go.bak 1KB
instancePtr.go.bak 660B
instance.go.bak 519B
MapReduce.go.bak 329B
functor.go.bak 318B
decorter.go.bak 304B
middleware.go.bak 219B
define.go.bak 141B
main.go.bak 100B
.browserslistrc 30B
.eslintrc.cjs 283B
.eslintrc.cjs 282B
.eslintrc.cjs 282B
.eslintrc.cjs 282B
.eslintrc.cjs 232B
base.css 2KB
base.css 2KB
base.css 2KB
base.css 2KB
main.css 477B
main.css 477B
main.css 477B
app.fb0c6e1c.css 343B
base.css 281B
index.css 44B
main.css 21B
.DS_Store 6KB
.DS_Store 6KB
demo.env 0B
.gitignore 302B
.gitignore 302B
.gitignore 302B
.gitignore 302B
.gitignore 302B
.gitignore 284B
.gitignore 231B
.gitignore 231B
.gitignore 231B
.gitignore 231B
header.pb.go 12KB
oneof.pb.go 9KB
repeated.pb.go 7KB
host.go 7KB
service.pb.go 7KB
service.pb.go 6KB
service_grpc.pb.go 5KB
any.pb.go 5KB
demo.go 5KB
import.pb.go 5KB
write.go 5KB
hello.pb.go 5KB
enum.pb.go 4KB
host.go 4KB
ecdh.go 3KB
start.go 3KB
hello_grpc.pb.go 3KB
fmt_print_test.go 3KB
strconv_test.go 3KB
read.go 3KB
func_test.go 3KB
config.go 3KB
dao.go 3KB
cbc.go 3KB
server.go 3KB
for_test.go 3KB
main.go 3KB
if_test.go 3KB
proto.go 2KB
unsafep_test.go 2KB
proto.go 2KB
host.go 2KB
interface.go 2KB
struct_test.go 2KB
array_test.go 2KB
password.go 2KB
list.go 2KB
main.tmpl.go 2KB
pubsub.go 2KB
queue.go 2KB
h_test.go 2KB
hash_test.go 2KB
main.go 2KB
barrier.go 2KB
aes.go 2KB
collector.go 2KB
script.go 2KB
slice_test.go 2KB
op_test.go 2KB
map_test.go 2KB
main.go 2KB
rsa_test.go 2KB
http.go 2KB
strings_test.go 2KB
sort_bench_test.go 2KB
model.go 2KB
main.go 2KB
stack.go 2KB
heap_big.go 1KB
list_test.go 1KB
switch_test.go 1KB
共 1091 条
- 1
- 2
- 3
- 4
- 5
- 6
- 11
资源评论
LeonDL168
- 粉丝: 3003
- 资源: 785
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 正在量产的新能源汽车控制器代码,功能齐全,含原理图 完美可编译
- python语言youkushipin爬虫程序代码QZQ1.txt
- python语言youkushipin爬虫程序代码QZQ2.txt
- python语言youkushipin爬虫程序代码QZQ.txt
- MATLAB滚动轴承故障诊断程序:采用西楚凯斯大学数据,首先通过变分模态分解(VMD)算法处理,而后分别通过包络谱分析实现故障诊断 ps.通过尖峰对应的频率与计算出的故障频率比较,实现故障诊断
- douyinshipin爬虫程序代码QZQ1.txt
- labview振动,声音分析软件,提供源代码
- 机器学习期末作业-基于决策树的医疗保险费花费预测python源码(含数据集+作业报告).zip
- 预测26个英文字母,mnist进阶版emnist,onnx模型,支持halcon直接read-dl-model
- 爱立信5G-KPI体系介绍
- 英飞凌电动汽车参考方案,包含原理图,和Bom清单,说明文档和代码,基于英飞凌TC27xC平台
- 2021防黑运营版,多商户机器人,在线客服系统,自助注册客服系统源码
- 大功率四轮电动车控制器代码, 原理图和Pcb,完整可用
- 计算机视觉中YOLOv11的目标检测与训练性能提升
- 男神女神投票 开源运营版本 优化报名送积分增减审核逻辑等
- 猫狗二分类,基于pytorch自带的vgg训练的,效果不好,但勉强能用,onnx模型,可以结合c#自己推理着玩玩
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功