Spring Cloud Stream 使用场景:消息驱动的微服务应用 目的: 简化编码 统一抽象(门面模式思想) 基本概念 Source: Stream发送源,类似Produer、Publisher Sink:Stream接收器,类似Consumer、Subscriber Pr
Spring Cloud Stream
- 使用场景:消息驱动的微服务应用
- 目的:
- 简化编码
- 统一抽象(门面模式思想)
基本概念
- Source: Stream发送源,类似Produer、Publisher
- Sink:Stream接收器,类似Consumer、Subscriber
- Processor:处理管道
与RabbitMQ整合
改造user-service-client 消息发送源(Source)
依赖
<dependency><groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
定义UserMessage接口,用于输出
/*** 用户消息(输出)
*/
public interface UserMessage {
@Output("user-message-out")
MessageChannel output();
}
激活绑定
@EnableBinding(UserMessage.class)public class RibbonClientApplication {
修改配置文件
# Spring Cloud Stream Binding 配置# destination 指定 Rabbit MQ Topic:users
# user-message-out 为输出管道名称
spring.cloud.stream.bindings.user-message-out.destination = users
添加一个发送消息的Controller:
@RestControllerpublic class UserServiceClientController {
@Autowired
private UserMessage userMessage;
@Autowired
private ObjectMapper objectMapper;
@PostMapping("/user/save/message")
public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException {
MessageChannel messageChannel = userMessage.output();
// User 序列化成 JSON
String payload = objectMapper.writeValueAsString(user);
GenericMessage<String> message = new GenericMessage<>(payload);
// 发送消息
return messageChannel.send(message);
}
}
User对象实现序列化接口
user-api项目
@Datapublic class User implements Serializable{
private static final long serialVersionUID = -3345217474278625920L;
private String id;
private String name;
}
改造user-service-provider消息接收器(Sink)
依赖
<dependency><groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
定义用户接口
package com.learn.service.provider.stream;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface UserMessage {
@Input
SubscribableChannel input();
}
激活
@EnableBinding(UserMessage.class) //激活Stream Binding 到 UserMessagepublic class UserServiceProviderApplication {
修改配置文件
spring:application:
name: user-service-provider
cloud:
stream:
bindings:
input: #@input注解的方法名称
destination: users # 和消息发送方保持一致
Bindings配置格式为:
- Source: spring.cloud.stream.bindings.${source}.*
- Sink: spring.cloud.stream.bindings.${sink}.*
添加一个Service来消费这个消息
@Servicepublic class UserMessageService {
@Autowired
private UserMessage userMessage;
@Autowired
@Qualifier("userServiceInMemory")
private UserService userService;
@Autowired
private ObjectMapper objectMapper;
@PostConstruct
public void init() {
SubscribableChannel subscribableChannel = userMessage.input();
subscribableChannel.subscribe(message -> {
String body = (String) message.getPayload();
System.out.println("user:" + body);
try {
User user = objectMapper.readValue(body,User.class);
userService.saveUser(user);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
测试
通过POSTMAN 发送POST请求 到 http://localhost:6060/user-service-client/user/save/message
传送的内容为:
{"id":"3",
"name":"张家球"
}
访问 http://localhost:6060/user-service-provider/user
得到结果为:
[{
"id": "2",
"name": "王五"
},
{
"id": "3",
"name": "张家球"
}
]
访问RabbitMQ控制台可以看到有消息的起伏(多发送几次消息即可)