RabbitMq入门
消息队列
MQ是一种常见的上下游“逻辑解耦+物理解耦”的消息通信服务
MQ三大功能:流量消峰,应用解耦,异步处理
主流MQ:kafka,rocketMQ,rabbitMQ
RabbitMQ
4大核心概念
生产者,消费者,交换机,队列
RabbitMQ的安装
这里使用docker的方式进行安装
- 拉取镜像
1
docker pull rabbitmq
- 启动进程这时已经算是启动好了应用,但是任然无法访问
1
docker run -d --restart=always --name my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq
- 开启插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// 1、进入容器
docker exec -it my-rabbitmq bash
// 2、开启插件
rabbitmq-plugins enable rabbitmq_management
//创建账号
rabbitmqctl add_user admin 123
//设置用户角色
rabbitmqctl set_user_tags admin administrator
//设置用户权限
//(默认最大)
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
//查看用户角色
rabbitmqctl list_users
图标问题无法显现
🪁解决方案
🐳 1、查看所有容器(看id)
docker ps -a
🐳 2、进入容器内部
docker exec -it 容器id /bin/bash
🐳3、进入指定目录
cd /etc/rabbitmq/conf.d/
🐳4、修改 management_agent.disable_metrics_collector = false
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
🐳5、退出容器
exit
🐳6、重启容器
docker restart 容器Id
原文来自csdn
生产者代码
1 | package top.zfxt.rabbitmq.one |
消费者代码
1 | package top.zfxt.rabbitmq.one |
Work Queue(简单模式和工作模式)
情景:出现生产者产生大量的消息需要通过消费者去执行,而每个消息,只能被处理一次,不能处理多次,只能被一个消费者处理。
因此为了能够提高效率,采用轮询的方式,当一个消费者获得消息后,就让另一个消费者获得下一个消息。
消息应答
情景:如果一个消费者正在处理消息时,突然宕机,而队列中该消息已经删除。就会导致消息丢失。
因此为了保证消息不丢失。引入了应答机制。只有在消费者处理了消息并且应答之后,rabbitMQ才会删除该消息。
- 自动应答:
他的做法是,当消费者接收到消息内容后即刻返回应答,未必完成处理。这种多适用于高吞吐量时,与数据传输安全做权衡。
尽量少使用 - 手动应答
Channel.basicAck()
肯定应答Channel.basicNack()
否定应答Channel.basicReject()
直接丢弃该消息,直接拒绝
优点:他可以批量应答:Multiple,
消息自动重新入队
如果消息在消费者手中处理时,由于网络等问题无法发送ack确认信息。rabbitMQ会将该消息重新入队。这样可以确保不会丢失信息。
消息持久化
用于保证消息不会丢失,当出现了rabbitMQ奔溃宕机时,确保消息不丢失,就需要使用到消息持久化
- 队列持久化
即使重启rabbitmq,这个队列依然存在
(即修改duration属性为true) - 消息持久化
保证消息会被保存到磁盘上,不会丢失。但是它并不能完全保证不会丢失消息。他会存在一些延迟
(修改props属性,添加MessageProperties.PERSISTENT_TEXT_PLAIN)
(完整做法是加上发布确认)
在生产者的代码中加上1
2//开启发布确认
channel.confirmSelect()
确认发布
单个确认发布
同步确认消息。单个发布单个确认。缺点:发布速度特别慢1
2
3
4
5//单个消息发布后立刻确认(确认后返回true)
var flag = channel.waitForConfirms()
if(flag){
println("消息发送成功")
}批量确认发布
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17for (i in 0..MESSAGE_COUNT) {
val message = "message $i"
channel.basicPublish("",QUEUE_NAME,null,message.toByteArray())
//单个消息发布后立刻确认(确认后返回true)
var flag = channel.waitForConfirms()
if(flag){
println("消息发送成功")
}
//批量确认
if(i%100==0){
var flag = channel.waitForConfirms()
if(flag){
println("消息发送成功")
}
}
}缺点:如果出现确认问题,无法判断是那个消息出现的错误
异步确认发布
异步确认发布,比上面两个操作都要复杂。但是他的性价比最高。可以容易的判断是哪个消息出了问题1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42fun publicMessageAsync() {
val channel = RabbitMqUtils.getChannel()
//队列名称
val QUEUE_NAME = UUID.randomUUID().toString()
channel.queueDeclare(QUEUE_NAME, false, false, false, null)
//开启发布确认
channel.confirmSelect()
/**
* 建立一个线程安全有序的一个哈希表,适用于高并发的情况下
* 1. 轻松的将序号与消息关联
* 2. 轻松批量删除消息条目
* 3. 支持多线程访问
*/
val outstandingConfirms = ConcurrentSkipListMap<Long, String>()
//准备监听器
channel.addConfirmListener({ deliveryTag, multiple ->
//监听成功
println("确认的消息:$deliveryTag")
//删除掉已经确认的消息
//判断是否批量
if (multiple) {
outstandingConfirms.headMap(deliveryTag).clear()
} else {
outstandingConfirms.remove(deliveryTag)
}
}, { deliveryTag, multiple ->
//监听失败
println("未确认的消息:$deliveryTag")
//如果有未确认的消息
println(outstandingConfirms[deliveryTag] + "未被确认")
})
println("发送一千条数据耗时:" + measureTimeMillis {
for (i in 0..MESSAGE_COUNT) {
val message = "message $i"
channel.basicPublish("", QUEUE_NAME, null, message.toByteArray())
//记录下要发送消息的总和
outstandingConfirms.put(channel.nextPublishSeqNo, message)
}
})
}//1首先确定消息的总和
//2在确认消息中,把确认的消息删除掉
//3打印未确认的消息
不公平分发
对于最开始发送消息时的轮询分发(也就是一人发一条信息,轮流接受)这就导致有消费者处理信息非常慢。影响整体效率。
因此采用能者多劳
1 | channel.basicQos(1) |
预取值
根据预取值分发消息,按照比例分配消息。
1 | //分配几条就发几条 |
发布和订阅模式
此前用到的都是一个生产者发布一个消息,并交由一个消费者消费。而交换机的一个作用就是实现消息的”发布和订阅”,即一消息多接受
交换机
rabbitMQ的核心思想:生产者的消息从不会直接发送给队列。也就是队列的状态对于生产者是透明的。
exchanges类型
direct(直接)
直接交换机的绑定是使用具体的routingKey来进行绑定,这样每个消息,只能发送到routingKey绑定的队列上。
默认的无名交换机就是直接交换机。
临时队列:创建出来的一个具有随机名字,没有持久化,当消费者断开连接后即自动删除的队列。1
channel.queueDeclare().queue
直接获取临时队列
topit(主题)
主题交换机的routingKey有限制要求:必须是一个单词列表,以点号分开(总共不能超过255个字节)*
可以代替一个单词。#
可以代替0个或多个单词
简称:通配符匹配headers(标题)
fanout(扇出)
发布/订阅模式
接收者:1
2
3
4
5
6
7
8
9
10
11
12
13
14fun main(args: Array<String>) {
val channel = RabbitMqUtils.getChannel()
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
//声明一个临时队列
val queueName = channel.queueDeclare().queue
//绑定交换机和队列
channel.queueBind(queueName, EXCHANGE_NAME, "")
println("等待接受消息,把接收到的消息打印出来")
channel.basicConsume(queueName, true, { consumerTag, message ->
println("接收到:${String(message.body)}")
}, { consumerTag -> })
}发布者:
1
2
3
4
5
6
7
8
9
10fun main(args: Array<String>) {
val channel = RabbitMqUtils.getChannel()
//交换机已经生成,不用再次创建
// channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
while (true) {
var message = readLine()
channel.basicPublish(EXCHANGE_NAME, "", null, message?.toByteArray())
println("发出消息:$message")
}
}
死信队列
无法被消费的消息。由于某些特定的原因导致queue中的某些消息无法被消费
,这样的消息如果没有被后续处理,就变成了死信,从而就有了死信队列。
应用场景:为了保证订单业务的数据不丢失。需要使用到RabbitMq的死信队列机制。
来源:
- 消息TTL过期
- 队列达到最大长度
- 消息被拒,并且不放回队列中
生产者代码:消费者1代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14fun main(args: Array<String>) {
val channel = RabbitMqUtils.getChannel()
//死信时间设置
val properties = AMQP.BasicProperties
.Builder()
.expiration("10000")
.build()
for (i in 0..10) {
val message = "info-$i"
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan",properties ,message.toByteArray())
}
}消费者2代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35fun main(args: Array<String>) {
val channel = RabbitMqUtils.getChannel()
//声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, "direct")
channel.exchangeDeclare(DEAD_EXCHANGE, "direct")
//声明队列
val arguments = HashMap<String, Any>()
//过期时间设置
//arguments["x-message-ttl"] = 10000
//可以通过生产者设置时间
//2. 设置队列最大长度,(超过长度的会变为死信内容)
arguments["x-max-length"] = 6
//正常队列,设置死信后交换机
arguments["x-dead-letter-exchange"] = DEAD_EXCHANGE
//设置死信routingKey
arguments["x-dead-letter-routing-key"] = "lisi"
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments)
//==============================================================================================//
channel.queueDeclare(DEAD_QUEUE, false, false, false, null)
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan")
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi")
channel.basicConsume(NORMAL_QUEUE, true, { consumerTag, message ->
println(String(message.body, charset("UTF-8")))
}, { consumerTag -> })
}1
2
3
4
5
6
7
8fun main(args: Array<String>) {
val channel = RabbitMqUtils.getChannel()
channel.basicConsume(DEAD_QUEUE, true, { consumerTag, message ->
println(String(message.body, charset("UTF-8")))
}, { consumerTag -> })
}
延迟队列
就是死信队列中所谓的TTL(消息最大保存时间)
延迟队列就是需要指定时间被处理的队列
使用场景:如订单10分钟未支付则自动取消
springboot整合
依赖包导入:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation ("javax.servlet:javax.servlet-api:4.0.1")
implementation("org.springframework.boot:spring-boot-starter")
// rabbitMQ依赖
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("com.squareup.moshi:moshi:1.13.0")
//注解
compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
//swagger
implementation("io.springfox:springfox-swagger2:3.0.0")
implementation("io.springfox:springfox-swagger-ui:3.0.0")
//rabbitMQ测试依赖
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.amqp:spring-rabbit-test")
}配置项,配置交换机和队列以及他们的关系
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class TtlQueueConfig {
companion object{
//普通交换机
val X_EXCHANGE = "X"
//死信交换机
val Y_DEAD_LETTER_EXCHANGE = "Y"
//普通队列
val QUEUE_A = "QA"
val QUEUE_B = "QB"
//死信队列
val QUEUE_DEAD_LETTER = "QD"
}
//声明xExchange
fun xExchange(): DirectExchange {
return DirectExchange(X_EXCHANGE)
}
//声明yExchange
fun yExchange(): DirectExchange {
return DirectExchange(Y_DEAD_LETTER_EXCHANGE)
}
//声明普通队列TTL为10s
fun queueA(): Queue {
val arguments = HashMap<String, Any>()
arguments["x-message-ttl"] = 10000
arguments["x-dead-letter-exchange"] = Y_DEAD_LETTER_EXCHANGE
arguments["x-dead-letter-routing-key"] = "YD"
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build()
}
//声明普通队列TTL为40s
fun queueB(): Queue {
val arguments = HashMap<String, Any>()
arguments["x-message-ttl"] = 40000
arguments["x-dead-letter-exchange"] = Y_DEAD_LETTER_EXCHANGE
arguments["x-dead-letter-routing-key"] = "YD"
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build()
}
//死信队列
fun queueD(): Queue {
return QueueBuilder.durable(QUEUE_DEAD_LETTER).build()
}
//绑定
fun queueABindingX(Queue, exchange: DirectExchange) queueA: : Binding {
return BindingBuilder.bind(queueA).to(exchange).with("XA")
}
fun queueBBindingX(Queue, exchange: DirectExchange) queueB: : Binding {
return BindingBuilder.bind(queueB).to(exchange).with("XB")
}
fun queueDBindingY(Queue, exchange: DirectExchange) queueD: : Binding {
return BindingBuilder.bind(queueD).to(exchange).with("YD")
}
}设置生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class SendMsgController {
private val log = LoggerFactory.getLogger(this.javaClass)
private lateinit var rabbitTemplate: RabbitTemplate
//开始发消息
fun sendMsg(String) message: {
log.info("当前时间:${Date().toString()},发送一条消息给两个TTL队列:$message")
rabbitTemplate.convertAndSend("X", "XA", "消息消过期时间为10s:$message")
rabbitTemplate.convertAndSend("X", "XB", "消息消过期时间为40s:$message")
}
}设置消费者
1
2
3
4
5
6
7
8
9
10class DeadLetterQueueConsumer {
private val log = LoggerFactory.getLogger(this.javaClass)
//接受消息
fun reveiceD(message:Message,channel:Channel){
var msg = String(message.body)
log.info("当前时间:${Date().toString()},收到死信队列的消息:$msg")
}
}通过生产者设置过期时间
1 | //发消息和延迟时间 |
- 出现问题(队列先进先出)
这个是基于死信队列才会产生的问题。因此采用基于插件的方式去解决
基于插件解决延迟队列
安装延时队列插件
可去官网下载 rabbitmq_delayed_message_exchange 插件,放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
1 | [root@VM-0-6-centos software]# ls |
在这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
- 配置文件类代码:
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
1 |
|
- 生产者代码
1
2
3
4
5
6
7
8
9
public void sendMsg( { String message, Integer delayTime)
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
}
总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
发布确认-springboot集成
在生产环境中,出现特殊情况导致rabbitMQ重启。如何去手动回复和处理。
发布确认,(交换机)
配置确认发布
在application.yml
中配置:1
publisher-confirm-type: correlated
他有三个选项:1. NONE,默认,不开启发布确认。2. CORRELATED,发布消息成功后触发回调方法。3. SIMPLE,他的第一个效果和CORRELATED一样,第二个效果是如果他接受到
waitForConfirmsOrDie
的返回值为false时,会直接关闭channel信道。生产者代码:
1
2
3
4
5
6//发消息
fun send(String) msg: {
rabbitTemplate.convertAndSend("confirm_exchange", "key1", msg, CorrelationData("1"))
log.info("发送消息:$msg")
}
他需要配置回调接口
1 |
|
- 消费者代码:
1
2
3
4
5
6
7
8
class ConfirmConsumer {
private val log = LoggerFactory.getLogger(javaClass)
fun receive(msg: Message) {
log.info("接收到的消息:{}", String(msg.body))
}
}
回退消息
通过设置mandatory
参数可以将消息传递过程中不可达目的的消息返还给生产者
配置项:publisher-returns: true
然后再配置回退消息的回调函数,然后执行即可:
1 |
|
备份交换机
优点:可以备份消息和报警
他的关键点是他的正常交换机参数需要设置一个
当正常交换机出现问题时,他会进入备份交换机。(与死信交换机类似)。只是死信交换机是消息无法正确得到解决会进入死信交换机。
而消息如果无法正常进入交换机时,会进入备份交换机
如果消息回退和备份交换机同时开启,谁会优先执行:经检测:备份交换机优先级更高
幂等性
应用场景:消息重复消费。消费者再消费MQ时,已经处理完了该信息,但是再返回ack确认时,网络中断。故MQ会重新发送这条消息给该消费者或其他消费者。这就会导致重复消费。
解决办法:使用全局ID表示。可以用时间戳或者UUID的一个全局唯一ID。
两种操作
a. 唯一ID+指纹码机制
b. 利用redis的原子性去实现
redis原生具有setnx。天生具有幂等性判断。
优先级队列
使用场景:订单催送。
一般情况下,队列中的消息是先进先出的顺序。但是使用了优先级队列,那么优先级高的可以先被消费
官方允许最大优先级的数量为(0-255)
他的关键实现就是对队列设置优先级和消息设置优先级。
惰性队列
惰性队列的消息是保存在磁盘中的。他的消费速度会比较慢
常用于消费者宕机或者下线维护时。这样子就不会消息堆积导致消息丢失了。