PHP版多进程服务器 workProcessNum = intval($config['workProcessNum']); } if ($this-workProcessNum 1) { self::log(1, "child workProcessNum can not be less than 1"); throw new TException('child workProcessNum can not be less than 1'); }
workProcessNum = intval($config['workProcessNum']); } if ($this->workProcessNum < 1) { self::log(1, "child workProcessNum can not be less than 1"); throw new TException('child workProcessNum can not be less than 1'); } if (isset($config['maxWorkRequestNum'])) { $this->maxWorkRequestNum = intval($config['maxWorkRequestNum']); } if ($this->maxWorkRequestNum < 1) { self::log(1, "child maxWorkRequestNum can not be less than 1"); throw new TException('child maxWorkRequestNum can not be less than 1'); } if (isset($config['sendTimeoutSec'])) { $this->sendTimeoutSec = intval($config['sendTimeoutSec']); } if (isset($config['recvTimeoutSec'])) { $this->recvTimeoutSec = intval($config['recvTimeoutSec']); } if ($daemon) { $this->daemon(); $this->registerSignalHandler(); self::$logFiles = isset($config['logFiles']) && is_array($config['logFiles']) ? $config['logFiles'] : array(); self::$pidFile = isset($config['pidFile']) ? $config['pidFile'] : ''; declare(ticks=3); } $this->pid = posix_getpid(); self::createPidFile($this->pid); self::log(0, "manage process({$this->pid}) has started"); $this->transport_->listen(); while (!$this->stop_) { while ($this->workProcessNum > 0) { try { $pid = pcntl_fork(); if ($pid > 0) { $this->handleParent($pid, $this->workProcessNum); } else if ($pid === 0) { $this->pid = posix_getpid(); $this->handleChild($this->workProcessNum); } else { self::log(1, "Failed to fork"); throw new TException('Failed to fork'); } $this->workProcessNum--; } catch (Exception $e) { } } $this->collectChildren(); sleep(2); if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) { $this->stop(); } } } public function getCurrentWorkRequestNum() { return $this->currentWorkRequestNum; } public function getCurrentConnectCallNum() { return $this->currentConnectCallNum; } /** * Code run by the parent * * @param int $pid * @param int $num 进程编号 * @return void */ private function handleParent($pid, $num) { $this->childrens[$pid] = $num; } /** * Code run by the child. * * @param int $num 进程编号 * @return void */ private function handleChild($num) { self::log(0, "child process($this->pid) has started"); $this->childrens = array(); while (!$this->stop_) { try { $transport = $this->transport_->accept(); if ($transport != null) { $transport->setSendTimeout($this->sendTimeoutSec * 1000); $transport->setRecvTimeout($this->recvTimeoutSec * 1000); $this->currentWorkRequestNum++; $this->currentConnectCallNum = 0; $inputTransport = $this->inputTransportFactory_->getTransport($transport); $outputTransport = $this->outputTransportFactory_->getTransport($transport); $inputProtocol = $this->inputProtocolFactory_->getProtocol($inputTransport); $outputProtocol = $this->outputProtocolFactory_->getProtocol($outputTransport); while ($this->processor_->process($inputProtocol, $outputProtocol)) { $this->currentConnectCallNum++; } @$transport->close(); } } catch (TTransportException $e) { } catch (Exception $e) { self::log(1, $e->getMessage().'('.$e->getCode().')'); } if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) { $this->stop(); } if ($this->currentWorkRequestNum >= $this->maxWorkRequestNum) { self::log(0, "child process($this->pid) has processe {$this->currentWorkRequestNum} requests will be exit"); $this->stop(); break; } } exit(0); } /** * Collects any children we may have * * @return void */ private function collectChildren() { foreach ($this->childrens as $pid => $num) { if (pcntl_waitpid($pid, $status, WNOHANG) > 0) { unset($this->childrens[$pid]); $this->workProcessNum++; } } } /** * @return void */ public function stop() { $this->transport_->close(); $this->stop_ = true; foreach ($this->childrens as $pid => $num) { if (!posix_kill($pid, SIGTERM)) { } } } /** * 附加信号处理 */ public static function sig_handler($signo) { switch ($signo) { case SIGTERM: case SIGHUP: case SIGQUIT: case SIGTSTP: $pid = posix_getpid(); self::log(0, "process($pid) catch signo: $signo"); \Thrift\Server\TMultiProcessServer::$catchQuitSignal = $signo; break; default: } } /** * 附加信号处理 */ private function registerSignalHandler() { pcntl_signal(SIGTERM, '\Thrift\Server\TMultiProcessServer::sig_handler'); pcntl_signal(SIGHUP, '\Thrift\Server\TMultiProcessServer::sig_handler'); pcntl_signal(SIGQUIT, '\Thrift\Server\TMultiProcessServer::sig_handler'); pcntl_signal(SIGTSTP, '\Thrift\Server\TMultiProcessServer::sig_handler'); declare(ticks=3); } /** * 附加守护进程方式 */ private function daemon() { if (!function_exists('posix_setsid')) { return; } if (($pid1 = pcntl_fork()) != 0) { exit; } posix_setsid(); if (($pid2 = pcntl_fork()) != 0) { exit; } } public static function log($type, $msg) { static $fds; $msg = date('Y-m-d H:i:s')." $type {$msg}\n"; if (isset(self::$logFiles[$type]) && self::$logFiles[$type]) { if (file_exists(self::$logFiles[$type])) { if (empty($fds[$type])) { $fds[$type] = fopen(self::$logFiles[$type], 'a'); } if (!$fds[$type]) { $fds[$type] = fopen('php://stdout', 'w'); fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n"); } } else { if (!is_dir(dirname(self::$logFiles[$type])) && !mkdir(dirname(self::$logFiles[$type]), 0755, true)) { $fds[$type] = fopen('php://stdout', 'w'); fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING mkdir(".self::$logFiles[$type].") failed\n"); } elseif (!($fds[$type] = fopen(self::$logFiles[$type], 'a'))) { $fds[$type] = fopen('php://stdout', 'w'); fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n"); } } } else { $fds[$type] = fopen('php://stdout', 'w'); } $ret = fwrite($fds[$type], $msg); if (!$ret && self::$logFiles[$type]) { fclose($fds[$type]); $fds[$type] = fopen(self::$logFiles[$type], 'a'); $ret = fwrite($fds[$type], $msg); } return true; } public static function createPidFile($pid=0) { if (!$pid) { $pid = posix_getpid(); } if (file_exists(self::$pidFile)) { $fd = fopen(self::$pidFile, 'w'); if (!$fd) { self::log(1, "fopen(".self::$pidFile.") failed"); return false; } } else { if (!is_dir(dirname(self::$pidFile)) && !mkdir(dirname(self::$pidFile), 0755, true)) { self::log(1, "mkdir(".self::$pidFile.") failed"); return false; } elseif (!($fd = fopen(self::$pidFile, 'w'))) { self::log(1, "fopen(".self::$pidFile.") failed"); return false; } } if (!fwrite($fd, "$pid")) { self::log(1, "fwrite(".self::$pidFile.",$pid) failed"); return false; } fclose($fd); return true; } }