SpringCloud(9)
stream消息驱动
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架
应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动.Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。(目前支持rabbitmq,kafka)
他出现的场景在于:
有没有一种新的技术诞生,让我们不再关注具体MQ的细节。我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。
我们就不需要考虑这么多rabbitmq,kafka,rocketmq等。这样可以降低开发难度和学习难度。
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
标准MQ:
引入Stream后
Binder
这个用来实现屏蔽消息中间件的差异
Stream中的消息通信方式遵循发布-订阅模式。Topic主题进行广播
常用标准流程套路
- 生产者
POM:YAML:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>业务类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此处配置要绑定的rabbitmq的服务信息:
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123
bindings: #服务的整合处理
output: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的Exchange名称定义
content-type: application/json #设置消息类型,本次为json,文本则设置text/plain
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com #在信息列表时显示主机名称
prefer-ip-address: true #访间的路径变为IP地址
service:controller:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.messaging.Source
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.support.MessageBuilder
import top.zfxt.springcloud.service.IMessageProvider
import java.util.UUID
import javax.annotation.Resource
/**
* @author:zfxt
* @version:1.0
*/
//定义消息的推送管道
open class IMessageProviderImpl : IMessageProvider {
private lateinit var output: MessageChannel//消息发送管道
override fun send() {
val serial = UUID.randomUUID().toString()
output.send(MessageBuilder.withPayload(serial).build())
println("**********serial:$serial")
}
}1
2
3
4
5
6
7
8
9
10
11
12
class SendMessageController {
private lateinit var messageProvider: IMessageProvider
fun sendMessage(): String {
messageProvider.send()
return "success"
}
} - 消费者
pom保持和生产者一样
YML
1 | server: |
业务类:
1 |
|
重复消费问题
因为队列默认采用的是topic,所以每一条消息都会发送给所有的消费者,也就造成了所谓的重复消费的问题
即,不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费
只要放于同一个组中就可以解决重复消费问题。
消息持久化
加个group就会自动消息持久化。
如果没有加入group,那么在消费者尚未启动时,生产者产生消息会导致消息的丢失。而如果加入了消息持久化,他就可以把曾经未获得的消息重新获取并消费