当前位置 : 主页 > 编程语言 > java >

SpringCloud——消息驱动

来源:互联网 收集:自由互联 发布时间:2022-07-14
Spring Cloud Stream 使用场景:消息驱动的微服务应用 目的: 简化编码 统一抽象(门面模式思想) 基本概念 Source: Stream发送源,类似Produer、Publisher Sink:Stream接收器,类似Consumer、Subscriber Pr


Spring Cloud Stream

  • 使用场景:消息驱动的微服务应用
  • 目的:
  • 简化编码
  • 统一抽象(门面模式思想)

基本概念

  • Source: Stream发送源,类似Produer、Publisher
  • Sink:Stream接收器,类似Consumer、Subscriber
  • Processor:处理管道

与RabbitMQ整合

改造user-service-client 消息发送源(Source)

依赖

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

定义UserMessage接口,用于输出

/**
* 用户消息(输出)
*/
public interface UserMessage {

@Output("user-message-out")
MessageChannel output();
}

激活绑定

@EnableBinding(UserMessage.class)
public class RibbonClientApplication {

修改配置文件

# Spring Cloud Stream Binding 配置
# destination 指定 Rabbit MQ Topic:users
# user-message-out 为输出管道名称
spring.cloud.stream.bindings.user-message-out.destination = users

添加一个发送消息的Controller:

@RestController
public class UserServiceClientController {
@Autowired
private UserMessage userMessage;

@Autowired
private ObjectMapper objectMapper;

@PostMapping("/user/save/message")
public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException {
MessageChannel messageChannel = userMessage.output();
// User 序列化成 JSON
String payload = objectMapper.writeValueAsString(user);
GenericMessage<String> message = new GenericMessage<>(payload);
// 发送消息
return messageChannel.send(message);
}
}

User对象实现序列化接口

user-api项目

@Data
public class User implements Serializable{
private static final long serialVersionUID = -3345217474278625920L;
private String id;
private String name;
}

改造user-service-provider消息接收器(Sink)

依赖

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

定义用户接口

package com.learn.service.provider.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface UserMessage {
@Input
SubscribableChannel input();
}

激活

@EnableBinding(UserMessage.class) //激活Stream Binding 到 UserMessage
public class UserServiceProviderApplication {

修改配置文件

spring:
application:
name: user-service-provider
cloud:
stream:
bindings:
input: #@input注解的方法名称
destination: users # 和消息发送方保持一致

Bindings配置格式为:

  • Source: spring.cloud.stream.bindings.${source}.*
  • Sink: spring.cloud.stream.bindings.${sink}.*

添加一个Service来消费这个消息

@Service
public class UserMessageService {
@Autowired
private UserMessage userMessage;

@Autowired
@Qualifier("userServiceInMemory")
private UserService userService;

@Autowired
private ObjectMapper objectMapper;

@PostConstruct
public void init() {
SubscribableChannel subscribableChannel = userMessage.input();
subscribableChannel.subscribe(message -> {
String body = (String) message.getPayload();
System.out.println("user:" + body);
try {
User user = objectMapper.readValue(body,User.class);
userService.saveUser(user);
} catch (IOException e) {
e.printStackTrace();
}
});
}

}

测试

通过POSTMAN 发送POST请求 到 ​​http://localhost:6060/user-service-client/user/save/message​​

传送的内容为:

{
"id":"3",
"name":"张家球"
}

访问 ​​http://localhost:6060/user-service-provider/user​​

得到结果为:

[
{
"id": "2",
"name": "王五"
},
{
"id": "3",
"name": "张家球"
}
]

访问RabbitMQ控制台可以看到有消息的起伏(多发送几次消息即可)

SpringCloud——消息驱动_spring


上一篇:Java中的优先队列——二叉堆
下一篇:没有了
网友评论