当前位置 : 主页 > 编程语言 > java >

Spring Boot 整合 Reactor实例详解

来源:互联网 收集:自由互联 发布时间:2023-01-30
目录 引言 1 创建项目 2 集成 H2 数据库 3 创建测试类 3.1 user 实体 3.2 UserRepository 3.3 UserService 3.4 UserController 3.5 SpringReactorApplication 添加注解支持 测试 总结 引言 Reactor 是一个完全 非阻塞
目录
  • 引言
  • 1 创建项目
  • 2 集成 H2 数据库
  • 3 创建测试类
    • 3.1 user 实体
    • 3.2 UserRepository
    • 3.3 UserService
    • 3.4 UserController
    • 3.5 SpringReactorApplication 添加注解支持
  • 测试
    • 总结

      引言

      Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是 CompletableFuture, Stream,还有 Duration 。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。

      这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目。

      1 创建项目

      使用 ;https://start.spring.io/ 创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC

      然后导入 Reactor 包

      <dependency>
          <groupId>io.projectreactor</groupId>
          <artifactId>reactor-core</artifactId>
      </dependency>
      <dependency>
          <groupId>io.projectreactor</groupId>
          <artifactId>reactor-test</artifactId>
          <scope>test</scope>
      </dependency>

      2 集成 H2 数据库

      application.properties 文件中添加 H2 数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。

      server.port=8081
      ################ H2 数据库 基础配置 ##############
      spring.datasource.driverClassName=org.h2.Driver
      spring.datasource.url=jdbc:h2:~/user
      spring.datasource.username=sa
      spring.datasource.password=
      spring.jpa.database=h2
      spring.jpa.hibernate.ddl-auto=update
      spring.h2.console.path=/h2-console
      spring.h2.console.enable=true

      3 创建测试类

      3.1 user 实体

      建立简单数据操作实体 User。

      import lombok.Data;
      import lombok.NoArgsConstructor;
      import javax.persistence.*;
      /**
       * @Author: prepared
       * @Date: 2022/8/29 21:40
       */
      @Data
      @NoArgsConstructor
      @Table(name = "t_user")
      @Entity
      public class User {
          @Id
          @GeneratedValue(strategy = GenerationType.AUTO)
          private Long id;
          private String userName;
          private int age;
          private String sex;
          public User(String userName, int age, String sex) {
              this.userName = userName;
              this.age = age;
              this.sex = sex;
          }
      }

      3.2 UserRepository

      数据模型层使用 JPA 框架。

      import com.prepared.user.domain.User;
      import org.springframework.data.jpa.repository.JpaRepository;
      import org.springframework.stereotype.Repository;
      /**
       * @Author: prepared
       * @Date: 2022/8/29 21:45
       */
      @Repository
      public interface UserRepository extends JpaRepository<User, Long> {
      }

      3.3 UserService

      service 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。

      最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。

      doOnError 监控异常情况;

      doFinally 监控整体执行情况,如:耗时、调用量监控等。

      import com.prepared.user.dao.UserRepository;
      import com.prepared.user.domain.User;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.stereotype.Service;
      import reactor.core.publisher.Mono;
      import javax.annotation.Resource;
      import java.util.List;
      /**
       * @Author: prepared
       * @Date: 2022/8/29 21:45
       */
      @Service
      public class UserService {
          private Logger logger = LoggerFactory.getLogger(UserService.class);
          @Resource
          private UserRepository userRepository;
          public Mono<Boolean> save(User user) {
              long startTime = System.currentTimeMillis();
              return Mono.fromSupplier(() -> {
                          return userRepository.save(user) != null;
                      })
                      .doOnError(e -> {
                          // 打印异常日志&增加监控(自行处理)
                          logger.error("save.user.error, user={}, e", user, e);
                      })
                      .doFinally(e -> {
                          // 耗时 & 整体健康
                          logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
                      });
          }
          public Mono<User> findById(Long id) {
              long startTime = System.currentTimeMillis();
              return Mono.fromSupplier(() -> {
                          return userRepository.getReferenceById(id);
                      }).doOnError(e -> {
                          // 打印异常日志&增加监控(自行处理)
                          logger.error("findById.user.error, id={}, e", id, e);
                      })
                      .doFinally(e -> {
                          // 耗时 & 整体健康
                          logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime);
                      });
          }
          public Mono<List<User>> list() {
              long startTime = System.currentTimeMillis();
              return Mono.fromSupplier(() -> {
                          return userRepository.findAll();
                      }).doOnError(e -> {
                          // 打印异常日志&增加监控(自行处理)
                          logger.error("list.user.error, e", e);
                      })
                      .doFinally(e -> {
                          // 耗时 & 整体健康
                          logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
                      });
          }
        public Flux<User> listFlux() {
              long startTime = System.currentTimeMillis();
              return Flux.fromIterable(userRepository.findAll())
                      .doOnError(e -> {
                          // 打印异常日志&增加监控(自行处理)
                          logger.error("list.user.error, e", e);
                      })
                      .doFinally(e -> {
                          // 耗时 & 整体健康
                          logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
                      });
          }
      }

      3.4 UserController

      controller 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。

      list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。

      返回List可以使用Mono<List<User>> ,也可以使用 Flux<User>

      • Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素
      • Flux<T> 是一个标准的 Publisher<T>,表示为发出 0 到 N 个元素的异步序列
      import com.prepared.user.domain.User;
      import com.prepared.user.service.UserService;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      import reactor.core.publisher.Mono;
      import javax.annotation.Resource;
      import java.util.ArrayList;
      import java.util.List;
      /**
       * @Author: prepared
       * @Date: 2022/8/29 21:47
       */
      @RestController
      public class UserController {
          @Resource
          private UserService userService;
          @RequestMapping("/add")
          public Mono<Boolean> add() {
              User user = new User("xiaoming", 10, "F");
              return userService.save(user) ;
          }
          @RequestMapping("/list")
          public Mono<List<User>> list() {
              return userService.list();
          }
      }
          @RequestMapping("/listFlux")
          public Flux<User> listFlux() {
              return userService.listFlux();
          }

      3.5 SpringReactorApplication 添加注解支持

      Application 启动类添加注解 @EnableJpaRepositories

      import org.springframework.boot.SpringApplication;
      import org.springframework.boot.autoconfigure.SpringBootApplication;
      import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
      /**
       * Hello world!
       */
      @SpringBootApplication
      @EnableJpaRepositories
      public class SpringReactorApplication {
          public static void main(String[] args) {
              SpringApplication.run(SpringReactorApplication.class, args);
          }
      }

      测试

      启动项目,访问 localhost:8081/add,正常返回 true。

      查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。

      后台日志:

      2022-09-05 20:13:17.385  INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService    : list.user.time=181,

      执行了 UserService list() 方法的 doFinnally 代码块,打印耗时日志。

      总结

      响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?

      • Future 的 get() 方法;
      • Reactor 中的 block() 方法,subcribe() 方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;
      • 同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度

      以上就是Spring Boot 整合 Reactor实例详解的详细内容,更多关于Spring Boot 整合 Reactor的资料请关注自由互联其它相关文章!

      上一篇:Java 将Excel转为UOS的操作方法
      下一篇:没有了
      网友评论