【个人技术经验及开发技巧分享】 【个人技术经验及开发技巧分享】
首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载

Louis

首页
  • 操作系统初识
  • JAVA基础
  • JVM
  • 开发框架
  • Redis
  • Zookeeper
  • 消息中间件
  • 持久化
  • 算法
  • 网络
  • 系统架构
  • 并发编程
  • 框架
  • 开发杂货
  • 线上排查
  • 技巧备忘
  • 部署指南
  • 版本管理
  • 工作流程
  • 发版流程
  • 友情链接
  • 网站备忘
  • 在线工具
  • 学习
  • 各种云
  • 应用下载
  • 开发杂货

    • ES查询压测
    • Spring Cloud Stream
      • 1 简介
        • 1.1 概述
      • 2 快速搭建
        • 2.1 引入依赖
      • 3 开发指南
        • 3.1 apollo 增加配置stream.yml
        • 3.2: SpringBoot项目启动类,添加注解SpringBootApplication,EnableApolloConfig
        • 3.3: 创建消息通道绑定的接口
        • 3.4 发送消息
        • 3.5 接收消息
        • 3.6 接口测试
    • 线上Tomcat配置参考
    • 配置Prometheus及健康检测
    • Feign支持BasicAuth验证
    • Feign远程调用
    • Hystrix单方法熔断配置
    • 邮件发送自定义Excel
    • 本地开发联调配置
    • RabbitMQ配置备忘
    • Nacos配置中心
    • Java代码杂记
    • Oracle脚本备忘
    • Mysql并发数与连接数
    • 批量算费本地工具类
    • Apollo配置模糊查询
    • 异步任务AsyncIAE
    • 生产环境机器配置参考
  • 线上排查

  • 技巧备忘

  • 部署指南

  • 技术应用
  • 开发杂货
luoxiaofeng
2022-05-05
目录

Spring Cloud Stream

# 1 简介

# 1.1 概述

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

提示

目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

# 2 快速搭建

我们通过一个简单的示例对 Spring Cloud Stream 有一个初步的认识。中间件使用 RabbitMQ,创建 spring-cloud-stream 模块。

# 2.1 引入依赖

编辑 pom.xml 文件,引入 Spring Cloud Stream 对 RabbitMQ 支持的 spring-cloud-starter-stream-rabbit 依赖,该依赖包是 Spring Cloud Stream 对 RabbitMQ 支持的封装,其中包含了对 RabbitMQ 的自动化配置等内容。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
1
2
3
4
5
6
7
8
9
10
11
12

# 3 开发指南

# 3.1 apollo 增加配置stream.yml

spring:
  cloud:
    stream:
      bindings:
       #输入通道名称,对应java代码InputInterface定义的名称
        rabbit-mq-demo-test-input:
          #通道主题名
          destination: rabbit-mq-demo-test
          contentType: application/json
          #消费组名称, 多节点消费保证唯一
          group: rabbit-mq-demo-test
          #绑定的QM配置
          binder: rabbit-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6

        rabbit-mq-demo-test-output:
          destination: rabbit-mq-demo-test
          contentType: application/json
          group: 
          binder: rabbit-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6
            
        demo-test-input:
          destination: demo-test
          contentType: application/json
          group: demo-test
          binder: kafka-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6
            
        demo-test-output:
          destination: demo-test
          contentType: application/json
          group: demo-test
          binder: kafka-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6
      #默认配置, 当存在多个配置事必须配置, 否则获取不到MQ配置
      default-binder: rabbit-test    
      binders:
      #RabbitMQ配置
        rabbit-test:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: xxx.xxx.xxx.xxx
                port: 5672
                username: guest
                password: guest
                virtual-host: /
        #Kafka配置     
        kafka-test:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: xxx.xxx.xxx.xxx:9092
                      auto-add-partitions: true
                      auto-create-topics: true
                      min-partition-count: 1
                      replication-factor: 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

# 3.2: SpringBoot项目启动类,添加注解SpringBootApplication,EnableApolloConfig

@EnableApolloConfig({"stream.yml"})
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class}) 
public class PlatformDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(PlatformDemoApplication.class, args);
    }

}
1
2
3
4
5
6
7
8
9

屏蔽Rabbit org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect 错误

# 3.3: 创建消息通道绑定的接口

创建 InputInterface 接口,通过 @Input 注解定义输入通道和输出通道,另外,@Input 注解都还有一个 value 属性,该属性可以用来设置消息通道的名称,

RabbitMQ: rabbit-mq-demo-test-input

Kafka: demo-test-input

@Component
public interface InputInterface {

    //RabbitMQ接收者通道
    String RABBIT_MQ_DEMO_TEST_INPUT = "rabbit-mq-demo-test-input";

    //Kafka接收者通道
    String KAFKA_DEMO_TEST_INPUT = "demo-test-input";


    @Input(RABBIT_MQ_DEMO_TEST_INPUT)
    SubscribableChannel rabbitMQInput();

    @Input(KAFKA_DEMO_TEST_INPUT)
    SubscribableChannel kafkaSendInput();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

创建 OutputInterface接口,通过@Output 注解定义输入通道和输出通道,另外@Output 注解都还有一个 value 属性,该属性可以用来设置消息通道的名称,

RabbitMQ: rabbit-mq-demo-test-output

Kafka: demo-test-output

@Component
public interface OutputInterface {

    //RabbitMQ接收者通道
    String RABBIT_MQ_DEMO_TEST_INPUT = "rabbit-mq-demo-test-output";

    //Kafka接收者通道
    String KAFKA_DEMO_TEST_INPUT = "demo-test-output";


    @Output(RABBIT_MQ_DEMO_TEST_INPUT)
    SubscribableChannel rabbitMQSendMessage();

    @Output(KAFKA_DEMO_TEST_INPUT)
    SubscribableChannel kafkaSendMessage();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 3.4 发送消息

创建测试消息实体MessageDTO

@Data
public class MessageDTO {

    /**
     * ID
     */
    private Integer id;
    /**
     * 编码
     */
    private String code;
    /**
     * 名称
     */
    private String name;
    /**
     * 模块名称
     */
    private String module;
    /**
     * 操作类型
     */
    private String operation;
    /**
     * 冗余字段
     */
    private String json;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

创建MqMessageService接口以及MqServiceImpl实现类

public interface MqMessageService {

    boolean sendRabbitMqMessage(MessageDTO dto);

    boolean sendKafkaMessage(MessageDTO dto);

}
@Slf4j
@Service
@EnableBinding(value = {OutputInterface.class})
public class MqMessageServiceImpl implements MqMessageService {

    @Autowired
    private OutputInterface outputInterface;

    @Override
    public boolean sendRabbitMqMessage(MessageDTO dto) {
        Message message = MessageBuilder.withPayload(dto).build();
        return outputInterface.rabbitMQSendMessage().send(message);
    }

    @Override
    public boolean sendKafkaMessage(MessageDTO dto) {
        Message message = MessageBuilder.withPayload(dto).build();
        return outputInterface.kafkaSendMessage().send(message);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 3.5 接收消息

创建监听器InputStreamListener

@Slf4j
@EnableBinding(InputInterface.class)
@Component
public class InputStreamListener {

    @StreamListener(value = InputInterface.RABBIT_MQ_DEMO_TEST_INPUT)
    public void showRabbitMQMessage(@Payload MessageDTO dto) {
        log.info("showRabbitMQMessage message :[{}]", dto);

    }
    
    @StreamListener(value = InputInterface.KAFKA_DEMO_TEST_INPUT)
    public void showKafkaMessage(@Payload MessageDTO dto) {
        log.info("showKafkaMessage message:{}", dto);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 3.6 接口测试

1.创建MQController

@Api(value = "MQController", tags = "MQ测试")
@RequestMapping("/mqDemo")
@RestController
@Slf4j
public class MQController {

    @Autowired
    private MqMessageService mqMessageService;


    @ApiOperation(value = "发送RabbitMQ消息", notes = "发送RabbitMQ消息")
    @PostMapping("sendRabbitMqMessage")
    public Result<Boolean> sendRabbitMqMessage(@Valid @RequestBody MessageDTO messageDTO) {
        return Result.success(mqMessageService.sendRabbitMqMessage(messageDTO));
    }

    @ApiOperation(value = "发送Kafka消息", notes = "发送Kafka消息")
    @PostMapping("sendKafkaMessage")
    public Result<Boolean> sendKafkaMessage(@Valid @RequestBody MessageDTO messageDTO) {
        return Result.success(mqMessageService.sendKafkaMessage(messageDTO));
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

2.Postman测试

RabbitMq

发送接口:http://localhost:8080/demoapi/mqDemo/sendRabbitMqMessage

测试数据

{
    "id":1,
    "code":"code",
    "name":"name",
    "module":"module",
    "operation":"INSERT",
    "json":"发送RabbitMQ数据"

}
1
2
3
4
5
6
7
8
9

监听器接收RabbitMQ数据

c.y.p.d.m.l.InputStreamListener - showRabbitMQMessage message :[MessageDTO(id=1, code=code, name=name, module=module, operation=INSERT, json=发送RabbitMQ数据)]
1

Kafka

发送接口:http://localhost:8080/demoapi/mqDemo/sendKafkaMessage

{
    "id":2,
    "code":"code",
    "name":"name",
    "module":"module",
    "operation":"INSERT",
    "json":"发送Kafka数据"

}
1
2
3
4
5
6
7
8
9

监听器接收Kafka数据

INFO  c.y.p.d.m.l.InputStreamListener - showKafkaMessage message:MessageDTO(id=2, code=code, name=name, module=module, operation=INSERT, json=发送Kafka数据)
1
#Spring Cloud
ES查询压测
线上Tomcat配置参考

← ES查询压测 线上Tomcat配置参考→

最近更新
01
SpringBoot
10-21
02
Spring
10-20
03
Sentinel
10-14
更多文章>
Copyright © 2022-2023 Louis | 粤ICP备2022060093号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式