目录 1 引入 pom 包 2 配置说明 2.1 限流 ratelimiter 2.2 重试 retry 2.3 超时 TimeLimiter 2.4 断路器 circuitbreaker 2.5 壁仓 bulkhead 2.5.1 SemaphoreBulkhead 2.5.2 FixedThreadPoolBulkhead 3 使用 3.1 配置 3.2 使用注解实
目录
- 1 引入 pom 包
- 2 配置说明
- 2.1 限流 ratelimiter
- 2.2 重试 retry
- 2.3 超时 TimeLimiter
- 2.4 断路器 circuitbreaker
- 2.5 壁仓 bulkhead
- 2.5.1 SemaphoreBulkhead
- 2.5.2 FixedThreadPoolBulkhead
- 3 使用
- 3.1 配置
- 3.2 使用注解实现
1 引入 pom 包
<dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-all</artifactId> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId> </dependency>
2 配置说明
2.1 限流 ratelimiter
两个限流配置:backendA 1s 中最多允许 10 次请求;
backendB 每 500ms 最多允许 6 次请求。
resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s
2.2 重试 retry
注意指定需要重试的异常,不是所有的异常重试都有效。比如 DB 相关校验异常,如唯一约束等,重试也不会成功的。
重试配置:
resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException
2.3 超时 TimeLimiter
超时配置:
resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false
超时配置比较简单,主要是配置 timeoutDuration 也就是超时的时间。
cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。
2.4 断路器 circuitbreaker
断路器有几种状态:关闭、打开、半开。注意:打开,意味着不能访问,会迅速失败。
CircuitBreaker 使用滑动窗口来存储和汇总调用结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。基于计数的滑动窗口聚合最后 N 次调用的结果。基于时间的滑动窗口聚合了最后 N 秒的调用结果。
断路器配置:
resilience4j.circuitbreaker: instances: backendA: // 健康指标参数,非断路器属性 registerHealthIndicator: true slidingWindowSize: 100
值 0 表示断路器将在 HalfOpen 状态下无限等待,直到所有允许的调用都已完成。
2.5 壁仓 bulkhead
resilience4j 提供了两种实现壁仓的方法:
SemaphoreBulkhead
使用 Semaphore 实现FixedThreadPoolBulkhead
使用有界队列和固定线程池实现
resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1
2.5.1 SemaphoreBulkhead
2.5.2 FixedThreadPoolBulkhead
3 使用
3.1 配置
在 application.yml 文件中添加以下 resilience4j 配置:
resilience4j.circuitbreaker: instances: backendA: registerHealthIndicator: true slidingWindowSize: 100 resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1 resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false
3.2 使用注解实现
直接在需要限流的方法上增加注解@RateLimiter
实现限流;增加注解@Retry
实现重试;增加注解 @CircuitBreaker
熔断;增加注解 @Bulkhead
实现壁仓。name 属性中分别填写限流器、重试、熔断、壁仓组件的名字。
@Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA") @Retry(name = "backendA") @RateLimiter(name = "backendA") public Mono<List<User>> list() { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.findAll(); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } @Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA")//最多支持10个并发量 @Retry(name = "backendA")//使用 backendA 重试器,如果抛出 IOException 会重试三次。 @RateLimiter(name = "backendA")// 限流 10 Qps public Mono<Boolean> save(User user) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.save(user) != null; }) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("save.user.error, user={}, e", user, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime); }); }
注意:以上所有组件,都支持自定义。
到此这篇关于Spring Boot Reactor 整合 Resilience4j详析的文章就介绍到这了,更多相关Spring Boot Reactor 内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!