stream消息驱动

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架

应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动.Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。(目前支持rabbitmq,kafka)

他出现的场景在于:
有没有一种新的技术诞生,让我们不再关注具体MQ的细节。我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。
我们就不需要考虑这么多rabbitmq,kafka,rocketmq等。这样可以降低开发难度和学习难度。
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
标准MQ:

引入Stream后

Binder

这个用来实现屏蔽消息中间件的差异

Stream中的消息通信方式遵循发布-订阅模式。Topic主题进行广播

常用标准流程套路


  • 生产者
    POM
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    YAML
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    server:
    port: 8801

    spring:
    application:
    name: cloud-stream-provider

    cloud:
    stream:
    binders: #在此处配置要绑定的rabbitmq的服务信息:
    defaultRabbit: #表示定义的名称,用于binding整合
    type: rabbit #消息组件类型
    environment: #设置rabbitmq的相关的环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123

    bindings: #服务的整合处理
    output: #这个名字是一个通道的名称
    destination: studyExchange #表示要使用的Exchange名称定义
    content-type: application/json #设置消息类型,本次为json,文本则设置text/plain
    binder: defaultRabbit #设置要绑定的消息服务的具体设置

    eureka:
    client:
    service-url:
    defaultZone: http://localhost:7001/eureka
    instance:
    lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com #在信息列表时显示主机名称
    prefer-ip-address: true #访间的路径变为IP地址
    业务类:

    service:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    import org.springframework.cloud.stream.annotation.EnableBinding
    import org.springframework.cloud.stream.messaging.Source
    import org.springframework.messaging.MessageChannel
    import org.springframework.messaging.support.MessageBuilder
    import top.zfxt.springcloud.service.IMessageProvider
    import java.util.UUID
    import javax.annotation.Resource

    /**
    * @author:zfxt
    * @version:1.0
    */
    @EnableBinding(Source::class)//定义消息的推送管道
    open class IMessageProviderImpl : IMessageProvider {

    @Resource
    private lateinit var output: MessageChannel//消息发送管道
    override fun send() {
    val serial = UUID.randomUUID().toString()
    output.send(MessageBuilder.withPayload(serial).build())
    println("**********serial:$serial")
    }

    }
    controller:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RestController
    class SendMessageController {

    @Resource
    private lateinit var messageProvider: IMessageProvider

    @GetMapping("/sendMessage")
    fun sendMessage(): String {
    messageProvider.send()
    return "success"
    }
    }
  • 消费者

pom保持和生产者一样
YML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
server:
port: 8802

spring:
application:
name: cloud-stream-consumer

cloud:
stream:
binders: #在此处配置要绑定的rabbitmq的服务信息:
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123

bindings: #服务的整合处理
input: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的Exchange名称定义
content-type: application/json #设置消息类型,本次为json,文本则设置text/plain
binder: defaultRabbit #设置要绑定的消息服务的具体设置

eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)
instance-id: receive-8802.com #在信息列表时显示主机名称
prefer-ip-address: true #访间的路径变为IP地址

业务类:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@EnableBinding(Sink::class)
open class ReceiveMessageController {

@Value("\${server.port}")
private lateinit var serverPort: String

@StreamListener(Sink.INPUT)
fun input(message: Message<String>) {
println("消费者一号,----》接受到的消息:${message.payload} \t port:$serverPort")
}
}

重复消费问题

因为队列默认采用的是topic,所以每一条消息都会发送给所有的消费者,也就造成了所谓的重复消费的问题

即,不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费
只要放于同一个组中就可以解决重复消费问题。

消息持久化

加个group就会自动消息持久化。
如果没有加入group,那么在消费者尚未启动时,生产者产生消息会导致消息的丢失。而如果加入了消息持久化,他就可以把曾经未获得的消息重新获取并消费