过滤器
字面义上理解的过滤器类似下图,从一堆物品中筛选出符合条件的留下,不符合的丢弃。
GOF 职责链
GOF中有一种设计模式叫职责链,或者叫责任链,常规的UML图如下:
正统的职责链是将一个请求发给第一个接收者,接收者判断是否属于自己能处理的,如果能处理则执行操作并中止请求下发,流程到此为止。如果不能处理则将请求下发给下一个接收者一直到最后一个接收者。
变体职责链
上面提到正统的职责链有一个特点:当找到符合条件的执行者后流程中止并不会将请求继续下发给后续的执行者。这类场景比较适合比如审核操作,一个报销单由主管,经理,CTO一级一级的审批不能越权。
解耦合/细分
在实际项目中,往往会遇到非常复杂的业务场景,有可能是需要执行的方法特别多,也有可能是因为需要执行的方法有可能事先不知道,需要在运行时才能判断。如果将这些逻辑全部写在一个类或者一个方法中就会出现这样的问题:
耦合度高,一个方法或者一个类需要关联所有业务方法以及相关类不易扩展,需要执行的业务经常发生变化,如果每次变化都去修改统一的方法或者类,不符合开闭原则,维护成本非常高
所以就有了下图,一堆需要执行的方法发给第一个接收者,接收者判断哪些方法是自己可以执行的,有执行的就执行,然后无论是否有可执行的方法在处理完成后都将请求继续下发给后面的接收者。每个接收者完成自己负责的内容,多个接收者完成了复杂任务的分解。
RPC 过滤器
RPC过滤器与Spring MVC中的Filter作用基本相同,其中很大一个作用就是动态的给客户端或者是服务端增加切面功能,比如:
权限控制加密解密访问日志限流控制并发控制......
下图是我在RPC项目中实现过滤器机制的UML示意图
RpcFilter
定义一个过滤器接口,只包含一个invoke方法。
public interface RpcFilter<T> { <T> T invoke(RpcInvoker invoker, RpcInvocation invocation); }
RpcInvoker
这是RPC客户端以及服务端执行服务端方法或者是将客户端请求发送给服务端时需要调用的方法接口。
这个角色在Netty中也可以叫Handle,这个接口与上面的RpcFilter有点类似,只是在RPC框架中体现的角色不同而已,具体看UML图可知道两者关系。
public interface RpcInvoker { Object invoke(RpcInvocation invocation); }
RpcServerInvoker
服务端的一个执行者实现,包含两个核心功能:
构建职责链this.filterMap,是通过注解获取到的一组过滤器,此处不详细讲因为与本文关系不大RpcInvoker的invoker方法实际调用的是RpcFilter的方法,并将自身实例传递给RpcFilter,目的是构建职责链的关联关系
public RpcInvoker buildInvokerChain(final RpcInvoker invoker) { RpcInvoker last = invoker; List<RpcFilter> filters = Lists.newArrayList(this.filterMap.values()); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final RpcFilter filter = filters.get(i); final RpcInvoker next = last; last = new RpcInvoker() { @Override public Object invoke(RpcInvocation invocation) { return filter.invoke(next, invocation); } }; } } return last; }
此处并没有考虑职责链的排序,可以通过过滤器的注解上增加排序数字来解决。目前我写的过滤器注解中并没有实现排序功能,可以增加一个order的属性,然后在需要指定顺序的过滤器上增加对应属性值来支持。
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface ActiveFilter { String[] group() default {}; String[] value() default {}; }
发请求给职责链
服务端在读取到客户端的请求后,首先通过构建职责链得到RpcInvoker,然后调用RpcInvoker的invoke方法将请求下发。
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) { this.executor.execute(new Runnable() { @Override public void run() { RpcInvoker rpcInvoker=.... RpcResponse response=(RpcResponse) rpcInvoker.invoke...... }
过滤器案例
一般HTTP请求在不同网络角色中处理请求的能力会呈现为一个漏斗型,越上层职责越轻,越往下层职责越重,所对应的就是越上层处理请求量越大,越下层处理请求量越小。比如负载均衡器只负责请求转发而不负责具体的任务执行,而后端的Service服务器会执行大量的IO操作或者是消耗cpu的计算任务等,所以这两者在处理请求的量上往往是数量级的。
当出现大量请求时,为了有效的保护后端服务的稳定性(尽量不出现宕机),除了横向扩展服务器外还可以通过一些软件手段缓解后端服务的压力,这就是通常说的限流,本文因为需要简单实现一个限制的过滤器,所以直接引用现成的限流算法:令牌桶。
下面是客户端请求限流的一个简单实现,客户端在给服务端发起请求之前需要获取令牌,如果获取到则发送请求,如果获取不到一直等待。当然为了防止死锁,可以调用带超时时间的获取令牌方法。
@ActiveFilter(group = {ConstantConfig.CONSUMER}) public class AccessLimitFilter implements RpcFilter { private final static Logger logger = LoggerFactory.getLogger(AccessLimitFilter.class); @Override public Object invoke(RpcInvoker invoker, RpcInvocation invocation) { logger.info("before acquire"); AccessLimitManager.acquire(); Object rpcResponse=invoker.invoke(invocation); logger.info("after acquire"); return rpcResponse; } static class AccessLimitManager{ private final static RateLimiter rateLimiter=RateLimiter.create(2); public static void acquire(){ rateLimiter.acquire(); } } }
实现的比较粗糙,桶的大小是写死的,应该实现为可配置型,后续抽空完善下。
本文源码
https://github.com/jiangmin168168/jim-framework
文中代码是依赖上述项目的,如果有不明白的可下载源码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持易盾网络。