作者:bromine
链接:https://www.jianshu.com/p/411...
來源:简书
著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版。
Swoft Github: https://github.com/swoft-clou...
前言
Swoft提供了一个自建RPC(远程方法调用)实现,让你可以方便的调用其他Swoft上的服务。
RPC服务端的初始化
RPC有两种启动方式Http伴随启动和RPC单独启动。值得一提的是目前swoole的tcp服务即RPC服务,暂没有其他的tcp服务功能,所以基本上tcp相关的配置指代的就是RPC。
Http伴随启动
Swoft 的 RPC 服务在Http服务启动时候伴随启动
//Swoft\Http\Server\Http\HttpServer.php /** * Http Server */ class HttpServer extends AbstractServer /** * Start Server * * @throws \Swoft\Exception\RuntimeException */ public function start() { //code ... //根据.env配置文件Server区段的TCPABLE字段决定是否启动RPC服务 if ((int)$this->serverSetting['tcpable'] === 1) { $this->registerRpcEvent(); } //code .... } }
Swoole监听
初始化流程即根据相关注解注册一个swoole监听
//Swoft\Http\Server\Http\HttpServer.php /** * Register rpc event, swoft/rpc-server required * * @throws \Swoft\Exception\RuntimeException */ protected function registerRpcEvent() { //含有@SwooleListener且type为SwooleEvent::TYPE_PORT的Bean,即RpcEventListener $swooleListeners = SwooleListenerCollector::getCollector(); if (!isset($swooleListeners[SwooleEvent::TYPE_PORT][0]) || empty($swooleListeners[SwooleEvent::TYPE_PORT][0])) { throw new RuntimeException("Please use swoft/rpc-server, run 'composer require swoft/rpc-server'"); } //添加swoole RPC相关的tcp监听端口,使用的是.env文件中的TCP区段配置 $this->listen = $this->server->listen($this->tcpSetting['host'], $this->tcpSetting['port'], $this->tcpSetting['type']); $tcpSetting = $this->getListenTcpSetting(); $this->listen->set($tcpSetting); //根据RpcEventListener的相关注解添加监听处理句柄 $swooleRpcPortEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0]; $this->registerSwooleEvents($this->listen, $swooleRpcPortEvents); }
由于是初版,根据@SwooleListener
获取RPC监听Bean的相关处理暂时还有点生硬。
目前swoft中type为SwooleEvent::TYPE_PORT
的@SwooleListener
只有RpcEventListener
一个,如果添加了同类Bean容易出问题,稳定版出的时候应该会有相关优化。
RPC单独启动
入口从Swoft\Http\Server\Command\ServerCommand
换成Swoft\Rpc\Server\Command\RpcCommand
,流程和Http大同小异,就是swoole的设定监听,仅仅是少了HTTP相关的监听接口和事件而已,此处不再赘述。
RPC请求处理
RPC服务器和HTTP服务器的区别仅仅在于与客户端交互报文格式和报文所在的网络层(Swoft的RPC基于TCP层次),运行原理基本相通,都是路由,中间件,RPC Service(对应Http的Controller),你完全可以以Http服务的思路去理解他。
Swoole的RPC-TCP监听设置好后,RPC服务端就可以开始接受请求了。RpcEventListener
的负责的工作仅仅是把收到的数据转发给\Swoft\Rpc\Server\ServiceDispatcher
分发。Dispatcher
会将请求传递给各个Middleware中间件,最终最终传递给HandlerAdapterMiddleware
处理。
PackerMiddleware
PackerMiddleware 是RPC中比较重要的一个中间件,负责将TCP请求中数据流解包和数据流封包。
<?php //Swoft\Rpc\Server\Middleware.PackerMiddleware namespace Swoft\Rpc\Server\Middleware; /** * service packer * * @Bean() */ class PackerMiddleware implements MiddlewareInterface { /** * packer middleware * * @param \Psr\Http\Message\ServerRequestInterface $request * @param \Psr\Http\Server\RequestHandlerInterface $handler * * @return \Psr\Http\Message\ResponseInterface */ public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface { //获取servicePacker Bean(\Swoft\Rpc\Packer\ServicePacker)用于字符串解包封包 $packer = service_packer(); $data = $request->getAttribute(self::ATTRIBUTE_DATA); $data = $packer->unpack($data); // 触发一个RpcServerEvent::BEFORE_RECEIVE事件,默认只有一个用于添加请求上下文信息的BeforeReceiveListener // 利用中间件触发流程关键事件的做法耦合有点高,猜测以后会调整 App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data); //替换解包后的解包到Request中,提供给后续中间件和Handler使用 $request = $request->withAttribute(self::ATTRIBUTE_DATA, $data); /* @var \Swoft\Rpc\Server\Rpc\Response $response */ $response = $handler->handle($request); //为Response封包返回给RPC客户端 $serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE); $serviceResult = $packer->pack($serviceResult); return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult); } }
RouterMiddleware
RouterMiddleware负责根据RPC请求的method,version,interface 获取处理的RPC服务类,充当了路由的作用
<?php //Swoft\Rpc\Server\Middleware\RouterMiddleware.php /** * Service router * * @Bean() */ class RouterMiddleware implements MiddlewareInterface { /** * get handler from router * * @param \Psr\Http\Message\ServerRequestInterface $request * @param \Psr\Http\Server\RequestHandlerInterface $handler * * @return \Psr\Http\Message\ResponseInterface */ public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface { // service data $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $method = $data['method']??""; $version = $data['version']??""; $interface = $data['interface']??""; /* @var \Swoft\Rpc\Server\Router\HandlerMapping $serviceRouter */ $serviceRouter = App::getBean('serviceRouter'); //路由匹配,即向Swoft\Rpc\Server\Router\HandlerMapping->$routes获取RPC服务信息 $serviceHandler = $serviceRouter->getHandler($interface, $version, $method); // deliver service data $request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler); return $handler->handle($request); } }
Swoft启动阶段会扫描并初始化注解信息(参考注解章节),注解初始化完毕后会触发一个AppEvent::APPLICATION_LOADER
事件,此时会将来自@Service的所有RPC的路由信息注册到Swoft\Rpc\Server\Router\HandlerMapping->$routes
中,用于serviceRouter Bean
的路由匹配。
HandlerAdapterMiddleware
HandlerAdapterMiddleware
最终转发请求给HandlerAdapter
处理,HandlerAdapter会使用刚刚RouterMiddleware匹配到的服务类信息转发请求并封装Response最终返回给ServiceDispatcher,ServiceDispatcher会返回TCP流给客户端然后结束本次请求。
<?php //Swoft\Rpc\Server\Router\HandlerAdapter.php /** * Service handler adapter * @Bean("serviceHandlerAdapter") */ class HandlerAdapter implements HandlerAdapterInterface { /** * Execute service handler * * @param \Psr\Http\Message\ServerRequestInterface $request * @param array $handler * @return Response */ public function doHandler(ServerRequestInterface $request, array $handler): Response { // RPC方法的各个参数 $data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA); $params = $data['params'] ?? []; //路由解析出来的,处理该请求的服务Bean和方法 list($serviceClass, $method) = $handler; $service = App::getBean($serviceClass); // execute handler with params $response = PhpHelper::call([$service, $method], $params); $response = ResponseHelper::formatData($response); // 构造Response返回客户端 if (! $response instanceof Response) { $response = (new Response())->withAttribute(self::ATTRIBUTE, $response); } return $response; } }
RPC客户端的实现
在Bean的属性中声明@Reference
,Swoft即会根据@var
声明的类型注入相应的RPC客户端实例。
/** * @Reference(name="useraaaaaa") * * @var DemoInterface */ private $demoService;
依赖注入的实现会专门另外用一篇文章单独解释,这里先看看RPC客户端的相关代码。
远程代理
namespace Swoft\Rpc\Client\Service; /** * The proxy of service */ class ServiceProxy { /** * @param string $className * @param string $interfaceClass */ public static function loadProxyClass(string $className, string $interfaceClass) { $reflectionClass = new \ReflectionClass($interfaceClass); $reflectionMethods = $reflectionClass->getMethods(\ReflectionMethod::IS_PUBLIC); $template = "class $className extends \\Swoft\\Rpc\\Client\\Service implements {$interfaceClass} {"; //\Swoft\Rpc\Client\Service::class // the template of methods $template .= self::getMethodsTemplate($reflectionMethods); $template .= "}"; eval($template); } //code ... }
和AOP一样,原理一样是使用了动态代理,更具体的说法是动态远程代理。
RPC动态客户端类实现了客户端声明的Interface类型(如DemoInterface)并继承了\Swoft\Rpc\Client\Service
类。
动态类的实现很简单,对于接口显式声明的方法,实际上都是调用\Swoft\Rpc\Client\Service->call()
方法。
interface DemoInterface { /** * @param array $ids * @return array */ public function getUsers(array $ids); }
class 动态生成RPC客户端类 extends \Swoft\Rpc\Client\Service implements \App\Lib\DemoInterface { public function getUsers ( array $ids ) { $params = func_get_args(); return $this->call('getUsers', $params); } //code ... }
对于自动生成的defer方法,则是通过魔术方法__call()
,调用\Swoft\Rpc\Client\Service->deferCall()
/** * @param string $name * @param array $arguments * * @return ResultInterface * @throws RpcClientException */ function __call(string $name, array $arguments) { $method = $name; $prefix = self::DEFER_PREFIX;//'defer' if (strpos($name, $prefix) !== 0) { throw new RpcClientException(sprintf('the method of %s is not exist! ', $name)); } if ($name == $prefix) { $method = array_shift($arguments); } elseif (strpos($name, $prefix) === 0) { $method = lcfirst(ltrim($name, $prefix)); } return $this->deferCall($method, $arguments); }
我们这里只看具有代表性的call()
方法,deferCall()
大致相同。
RPC客户端动态类的本质是将客户端的参数和接口信息根据Swoft自己的格式传递给RPC服务端,然后将服务器返回的数据解包取出返回值返回给RPC的调用者,对外伪装成一个普通的对象,屏蔽远程调用操作。
// Swoft\Rpc\Client\Service.php /** * Do call service * * @param string $func * @param array $params * * @throws \Throwable * @return mixed */ public function call(string $func, array $params) { $profileKey = $this->interface . '->' . $func; //根据@reference的fallback属性获取降级处理句柄,在RPC服务调用失败的时候可以会使用fallback句柄代替 $fallback = $this->getFallbackHandler($func); try { $connectPool = $this->getPool(); $circuitBreaker = $this->getBreaker(); /* @var $client AbstractServiceConnection */ $client = $connectPool->getConnection(); //数据封包,和RPC服务端一致 $packer = service_packer(); $type = $this->getPackerName(); $data = $packer->formatData($this->interface, $this->version, $func, $params); $packData = $packer->pack($data, $type); //通过熔断器调用接口 $result = $circuitBreaker->call([$client, 'send'], [$packData], $fallback); if ($result === null || $result === false) { return null; } //和defercall不一致这里直接收包,解包 App::profileStart($profileKey); $result = $client->recv(); App::profileEnd($profileKey); $connectPool->release($client); App::debug(sprintf('%s call %s success, data=%', $this->interface, $func, json_encode($data, JSON_UNESCAPED_UNICODE))); $result = $packer->unpack($result); $data = $packer->checkData($result); } catch (\Throwable $throwable) { if (empty($fallback)) { throw $throwable; } //RPC调用失败则调用降级句柄,代替实际RPC服务直接返回 $data = PhpHelper::call($fallback, $params); } return $data; }
熔断器
熔断器的swoft-RPC的另一重要概念,RPC的所有请求都通过熔断器发送。
熔断器使用状态模式实现,熔断器有开启,半开,关闭 3种状态,不同状态下熔断器会持有不同的状态实例,状态根据RPC调用情况切换,熔断器根据持有状态实例的不同,行为也有所不同。
熔断器关闭状态策略
<?php //Swoft\Sg\Circuit\CloseState.php /** * close状态的熔断器,对所有RPC调用都通过协程客户端发送到RPC服务器 * 关闭状态及切换 * 1. 重置failCounter=0 successCount=0 * 2. 操作失败,failCounter计数 * 3. 操作失败一定计数,切换为open开启状态 */ class CloseState extends CircuitBreakerState { /** * 熔断器调用 * * @param mixed $callback 回调函数 * @param array $params 参数 * @param mixed $fallback 失败回调 * * @return mixed 返回结果 */ public function doCall($callback, $params = [], $fallback = null) { list($class, $method) = $callback; try { if ($class == null) { throw new \Exception($this->circuitBreaker->serviceName . "服务,连接建立失败(null)"); } if ($class instanceof Client && $class->isConnected() == false) { throw new \Exception($this->circuitBreaker->serviceName . "服务,当前连接已断开"); } //调用swoole协程客户端的send()方法发送数据 $data = $class->$method(...$params); } catch (\Exception $e) { //递增失败计数 if ($this->circuitBreaker->isClose()) { $this->circuitBreaker->incFailCount(); } App::error($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务端调用失败,开始服务降级容错处理,error=" . $e->getMessage()); //RPC调用失败则使用降级接口 $data = $this->circuitBreaker->fallback($fallback); } //失败次数过线则切换状态 $failCount = $this->circuitBreaker->getFailCounter(); $switchToFailCount = $this->circuitBreaker->getSwitchToFailCount(); if ($failCount >= $switchToFailCount && $this->circuitBreaker->isClose()) { App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],服务失败次数达到上限,开始切换为开启状态,failCount=" . $failCount); $this->circuitBreaker->switchToOpenState(); } App::trace($this->circuitBreaker->serviceName . "服务,当前[关闭状态],failCount=" . $this->circuitBreaker->getFailCounter()); return $data; } }
熔断器开启状态策略
<?php \\Swoft\Sg\Circuit\OpenState .php; /** * open状态的熔断器,对所有RPC调用都使用降级句柄代替 * 开启状态及切换(open) * 1. 重置failCounter=0 successCounter=0 * 2. 请求立即返回错误响应 * 3. 定时器一定时间后切换为半开状态(open) */ class OpenState extends CircuitBreakerState { /** * 熔断器调用 * * @param mixed $callback 回调函数 * @param array $params 参数 * @param mixed $fallback 失败回调 * * @return mixed 返回结果 */ public function doCall($callback, $params = [], $fallback = null) { $data = $this->circuitBreaker->fallback(); App::trace($this->getServiceName() . "服务,当前[开启状态],执行服务fallback服务降级容错处理"); $nowTime = time(); if ($this->circuitBreaker->isOpen() && $nowTime > $this->circuitBreaker->getSwitchOpenToHalfOpenTime() ) { $delayTime = $this->circuitBreaker->getDelaySwitchTimer(); // swoole定时器不是严格的,3s容错时间 ,定时切换状态的半开 $switchToHalfStateTime = $nowTime + ($delayTime / 1000) + 3; App::getTimer()->addAfterTimer('openState', $delayTime, [$this, 'delayCallback']); $this->circuitBreaker->setSwitchOpenToHalfOpenTime($switchToHalfStateTime); App::trace($this->getServiceName() . "服务,当前[开启状态],创建延迟触发器,一段时间后状态切换为半开状态"); } return $data; } }
熔断器半开状态策略
半开熔断器是熔断器关闭状态和熔断器开启状态的过度状态,半开熔断器的所有RPC调用都是加锁的,连续成功或者连续失败到阈值后会切换到关闭状态或者开启状态,代码类似,此处不再累述,有兴趣的读者可以自行研究。
Swoft源码剖析系列目录: https://segmentfault.com/a/11...