官方默认GatewayWorker只提供单一格式通信数据的分布式服务框架,Workerman是其内核。
Workerman巧妙的通过PCNTL实现多子进程服务,让每个子进程都可以监听同一个服务地址并进行竞争服务。因windows系统不支持PCNTL,所以在windows系统下只能通过多进程处理(且各进程无法监听同一服务地址),以下均以PCNTL运行模式进行说明。
官方地址:https://www.workerman.net/gateway-worker
GatewayWorker分布式结构
GatewayWorker允许外部扩展自定义协议,但默认不支持HTTP协议,这缘于GatewayWorker分布式通信结构,使得通信数据格式复杂的协议处理困难。
GatewayWorker通过内置三种服务:
- 注册中心(\Workerman\Worker\Register)
- 网关服务(\Workerman\Worker\Gateway)
- 业务服务(\Workerman\Worker\BusinessWorker)
官方原理说明:https://www.workerman.net/doc/gateway-worker/principle.html
注册中心
用来存储所有网关子进程与业务服务连接的内部连接服务地址,当有业务服务连接过来时就通知给业务服务连接网关的地址进行连接。
允许多个注册中心同时运行,以实现高可用。每个注册中心仅单进程运行,因为多进程各自是独立处理竞争得来的连接信息,会导致各进程竞争来得的网关信息不同,使业务服务获取的网关地址数据也不同,最终网关服务与业务服务连接不全服务性能受限。
监听
内部服务地址:使用内置text协议,仅限内部网关服务和业务服务连接。监听地址一般是内网或本地地址。
连接
无
网关服务
网关是用户与业务服务之间的桥梁,将用户请求的数据转发给某个业务服务子进程处理,再由业务服务子进程回复或通知网关转发给用户的数据。
网关服务建议使用多子进程模式,增加并发能力,同时各子进程共享了对外服务地址(通过竞争接受用户连接),但各子进程会独立创建一个对内服务地址(地址相同端口号会递增)。
监听
对外服务地址:给用户连接的,协议自行定义,一个网关地址在内部所有子进程内是共享连接的,但接受连接存在各子进程竞争。此监听是各子进程共享。
对内服务地址:给内部业务服务连接的,使用内置GatewayProtocol(\GatewayWorker\Protocols\GatewayProtocol)协议,此协议收发均会携带用户连接信息和数据,监听地址一般是内网或本地地址。当业务服务接收后进行解析后即可知道用户信息,处理完后能通知网关回复指定用户。此监听是各子进程独享。
连接
注册中心:使用内置text协议,主要将内部服务地址(网关节点)存放到注册中,用于业务服务获取并连接。连接是各子进程独享。
业务服务
业务服务只提供处理并无监听功能,均通过连接完成业务处理。建议使用多子进程模式,增加并发能力。所有连接是各子进程独享。
监听
无
连接
注册中心:使用内置text协议,主要获取内部服务地址(网关节点)进行连接。
网关节点:使用内置GatewayProtocol协议,将从注册中心获取的地址进行逐个连接并保持状态,等待网关通知业务处理数据。
如何支持HTTP协议
从GatewayWorker结构可以看的出GatewayProtocol协议是将网关服务收到的用户数据进行再打包转发给业务服务处理。
像HTTP这类协议会在网关收到后自动进行解码生成请求(\Workerman\Protocols\Http\Request)对象,再交给GatewayProtocol协议转发给业务服务,而GatewayProtocol协议只接收字符串转打包并不知道该如何处理Request对象。
而Websocket协议就不同,虽然Websocket协议有编码和解码过程,但最终解码与编码均是字符串,刚好匹配GatewayProtocol协议。
需要GatewayWorker支持HTTP协议最简单的办法是将新增一个HTTP网关协议,将HTTP解码与编码放到业务服务层处理即可衔接(其它类似协议也可以这样处理)。
示例代码如下,注意文件放置目录需要与命令空间相符。
HTTP网关协议代码:
<?php
/*
* Http协议网关处理
*/
namespace Protocols;
use Workerman\Connection\TcpConnection;
class HttpGateway extends \Workerman\Protocols\Http {
/**
* Http encode.
*
* @param string|Response $response
* @param TcpConnection $connection
* @return string
*/
public static function encode($response, TcpConnection $connection) {
return $response;
}
/**
* Http decode.
*
* @param string $recv_buffer
* @param TcpConnection $connection
* @return \Workerman\Protocols\Http\Request
*/
public static function decode($recv_buffer, TcpConnection $connection) {
return $recv_buffer;
}
}
业务服务事件处理代码
<?php
/*
* 业务处理事件
*/
namespace App;
use Workerman\Protocols\Http;
use GatewayWorker\Lib\Context;
use GatewayWorker\Lib\Gateway;
use GatewayWorker\BusinessWorker;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Response;
class Event {
/**
* @var BusinessWorker 当前业务处理服务实例
*/
public static $businessWorker;
/**
* @var array 静态文件类型集合
*/
protected static $mimeTypes = [];
/**
* 业务服务开始回调
* @param BusinessWorker $businesswork
*/
public static function onWorkerStart($businesswork) {
static::$businessWorker = $businesswork;
// 加载静态文件响应类型
$mimeFile = __DIR__ . '/../vendor/workerman/workerman/Protocols/Http/mime.types';
if (file_exists($mimeFile) && is_readable($mimeFile)) {
foreach (file($mimeFile, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES) as $line) {
$data = preg_split('/\s+/', $line);
if (count($data)) {
$type = array_shift($data);
foreach ($data as $ext) {
static::$mimeTypes[strtolower($ext)] = $type;
}
}
}
}
}
/**
* 业务服务结束回调
* @param BusinessWorker $businesswork
*/
public static function onWorkerStop($businesswork) {
}
/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id) {
}
/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
// 注意, 此时socket应该已经断开了
public static function onClose($client_id) {
}
/**
* 当客户端发来消息时触发
* @param int $cid 连接id
* @param mixed $recv_buffer 具体消息
*/
public static function onMessage($cid, $recv_buffer) {
$address_data = Context::clientIdToAddress($cid);
if ($address_data) {
$address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
$connection = Event::$businessWorker->gatewayConnections[$address];
$request = Http::decode($recv_buffer, $connection);
// 这里可以进行业务处理了
$response = static::requestStatic($request);
if (!$response) {
$response = static::requestBusiness($request);
}
if (!$response) {
$response = new Response(404, [], 'Not Found');
} elseif (isset($response->file)) {
$file = $response->file['file'];
if (file_exists($file) && is_readable($file)) {
// 发送文件
static::sendFile($response);
return;
}
$response = new Response(403, null, '403 Forbidden');
}
Gateway::sendToCurrentClient(Http::encode($response, $connection));
} else {
Gateway::closeCurrentClient();
}
}
/**
* 请求静态文件
* @param Request $request
* @return Response|null
*/
protected static function requestStatic(Request $request) {
$file = __DIR__ . '/../public/' . $request->path();
// 取出文件名后缀
$filename = basename($file);
$index = strrpos($filename, '.');
if ($index === false) {
return;
}
$ext = strtolower(substr($filename, $index + 1));
if (file_exists($file) && isset(static::$mimeTypes[$ext])) {
$response = new Response(200, ['Content-Type' => static::$mimeTypes[$ext]]);
$response->file = [
'file' => $file,
'offset' => 0,
'length' => 0,
];
return $response;
}
}
/**
* 请求业务处理
* @param Request $request
* @return Response|null
*/
protected static function requestBusiness(Request $request) {
$data = array_filter(explode('/', $request->path()));
switch (count($data)) {
case 0:
$controller = 'IndexController';
$action = 'index';
break;
case 1:
$controller = ucfirst(strtolower($data[0])) . 'Controller';
$action = 'index';
break;
default:
$action = array_pop($data);
$data = array_map(function($item) {
return ucfirst(strtolower($item));
}, $data);
$controller = implode('\\', $data) . 'Controller';
break;
}
$controller = "\\App\\Controllers\\{$controller}";
if (class_exists($controller) && method_exists($controller, $action)) {
return (new $controller())->$action($request);
}
}
/**
* 发送文件
* @param Response $response
*/
protected static function sendFile(Response $response) {
$file = $response->file['file'];
$offset = $response->file['offset'];
$length = $response->file['length'];
clearstatcache();
$file_size = (int) \filesize($file);
$body_len = $length > 0 ? $length : $file_size - $offset;
$response->withHeaders(array(
'Content-Length' => $body_len,
'Accept-Ranges' => 'bytes',
));
if ($offset || $length) {
$offset_end = $offset + $body_len - 1;
$response->header('Content-Range', "bytes $offset-$offset_end/$file_size");
}
$handle = \fopen($file, 'r');
if ($offset !== 0) {
\fseek($handle, $offset);
}
$size = 1024 * 1024;
$buffer = fread($handle, $size);
Gateway::sendToCurrentClient((string) $response . $buffer);
while (!feof($handle)) {
$buffer = fread($handle, $size);
Gateway::sendToCurrentClient($buffer);
}
fclose($handle);
}
}
网关地址配置代码
<?php
// 前段代码省略
// 指定网关地址
$gateway = new Gateway('HttpGateway://0.0.0.0:80');
// 后段代码省略
// 一定要网关发送心跳包功能
业务服务事件处理指定
<?php
// 前段代码省略
$worker = new BusinessWorker();
// 事件处理
$worker->eventHandler = \App\Event::class;
// 后段代码省略
// 一定要网关发送心跳包功能
增加控制器
<?php
/*
* 控制器
*/
namespace App\Controllers;
class IndexController extends Controller {
public function index() {
return __METHOD__;
}
}
静态文件需要放在项目根目录的 public/ 目录下才有效。
启动验证(能打印 App\Controllers\IndexController::index 表示成功)
curl http://127.0.0.1/
特别注意:HTTP是短连接不能使用GatewayWorker\Lib\Gateway工具,同时也不能直接使用$_SESSION全局变量,需要处理session可通过Workerman\Protocols\Http\Request处理。