当前位置 : 主页 > 网络编程 > PHP >

PHP版多进程服务器

来源:互联网 收集:自由互联 发布时间:2021-06-28
PHP版多进程服务器 workProcessNum = intval($config['workProcessNum']); } if ($this-workProcessNum 1) { self::log(1, "child workProcessNum can not be less than 1"); throw new TException('child workProcessNum can not be less than 1'); }
PHP版多进程服务器
 workProcessNum = intval($config['workProcessNum']);  
        }  
        if ($this->workProcessNum < 1) {  
            self::log(1, "child workProcessNum can not be less than 1");  
            throw new TException('child workProcessNum can not be less than 1');  
        }  
        if (isset($config['maxWorkRequestNum'])) {  
            $this->maxWorkRequestNum = intval($config['maxWorkRequestNum']);  
        }  
        if ($this->maxWorkRequestNum < 1) {  
            self::log(1, "child maxWorkRequestNum can not be less than 1");  
            throw new TException('child maxWorkRequestNum can not be less than 1');  
        }  
        if (isset($config['sendTimeoutSec'])) {  
            $this->sendTimeoutSec = intval($config['sendTimeoutSec']);  
        }  
        if (isset($config['recvTimeoutSec'])) {  
            $this->recvTimeoutSec = intval($config['recvTimeoutSec']);  
        }  
        if ($daemon) {  
            $this->daemon();  
            $this->registerSignalHandler();  
            self::$logFiles = isset($config['logFiles']) && is_array($config['logFiles']) ? $config['logFiles'] : array();  
            self::$pidFile = isset($config['pidFile']) ? $config['pidFile'] : '';  
            declare(ticks=3);  
        }  
        $this->pid = posix_getpid();  
        self::createPidFile($this->pid);  
        self::log(0, "manage process({$this->pid}) has started");  
        $this->transport_->listen();  
        while (!$this->stop_) {  
            while ($this->workProcessNum > 0) {  
                try {  
                    $pid = pcntl_fork();  
                    if ($pid > 0) {  
                        $this->handleParent($pid, $this->workProcessNum);  
                    } else if ($pid === 0) {  
                        $this->pid = posix_getpid();  
                        $this->handleChild($this->workProcessNum);  
                    } else {  
                        self::log(1, "Failed to fork");  
                        throw new TException('Failed to fork');  
                    }  
                    $this->workProcessNum--;  
                } catch (Exception $e) {  
                }  
            }  
            $this->collectChildren();  
            sleep(2);  
            if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) {  
                $this->stop();  
            }  
        }  
    }  
      
    public function getCurrentWorkRequestNum()  
    {  
        return $this->currentWorkRequestNum;  
    }  
      
    public function getCurrentConnectCallNum()  
    {  
        return $this->currentConnectCallNum;  
    }  
  
    /** 
     * Code run by the parent 
     * 
     * @param int $pid 
     * @param int $num 进程编号 
     * @return void 
     */  
    private function handleParent($pid, $num)  
    {  
        $this->childrens[$pid] = $num;  
    }  
  
    /** 
     * Code run by the child. 
     * 
     * @param int $num 进程编号 
     * @return void 
     */  
    private function handleChild($num)  
    {  
        self::log(0, "child process($this->pid) has started");  
        $this->childrens = array();  
        while (!$this->stop_) {  
            try {  
                $transport = $this->transport_->accept();  
                if ($transport != null) {  
                    $transport->setSendTimeout($this->sendTimeoutSec * 1000);  
                    $transport->setRecvTimeout($this->recvTimeoutSec * 1000);  
                    $this->currentWorkRequestNum++;  
                    $this->currentConnectCallNum = 0;  
                    $inputTransport = $this->inputTransportFactory_->getTransport($transport);  
                    $outputTransport = $this->outputTransportFactory_->getTransport($transport);  
                    $inputProtocol = $this->inputProtocolFactory_->getProtocol($inputTransport);  
                    $outputProtocol = $this->outputProtocolFactory_->getProtocol($outputTransport);  
                    while ($this->processor_->process($inputProtocol, $outputProtocol)) {  
                        $this->currentConnectCallNum++;  
                    }  
                    @$transport->close();  
                }  
            } catch (TTransportException $e) {  
            } catch (Exception $e) {  
                self::log(1, $e->getMessage().'('.$e->getCode().')');  
            }  
            if (\Thrift\Server\TMultiProcessServer::$catchQuitSignal) {  
                $this->stop();  
            }  
            if ($this->currentWorkRequestNum >= $this->maxWorkRequestNum) {  
                self::log(0, "child process($this->pid) has processe {$this->currentWorkRequestNum} requests will be exit");  
                $this->stop();  
                break;  
            }  
        }  
        exit(0);  
    }  
  
    /** 
     * Collects any children we may have 
     * 
     * @return void 
     */  
    private function collectChildren()  
    {  
        foreach ($this->childrens as $pid => $num) {  
            if (pcntl_waitpid($pid, $status, WNOHANG) > 0) {  
                unset($this->childrens[$pid]);  
                $this->workProcessNum++;  
            }  
        }  
    }  
  
    /** 
     * @return void 
     */  
    public function stop()  
    {  
        $this->transport_->close();  
        $this->stop_ = true;  
        foreach ($this->childrens as $pid => $num) {  
            if (!posix_kill($pid, SIGTERM)) {  
            }  
        }  
    }  
  
    /** 
     * 附加信号处理 
     */  
    public static function sig_handler($signo)  
    {  
        switch ($signo) {  
            case SIGTERM:  
            case SIGHUP:  
            case SIGQUIT:  
            case SIGTSTP:  
                $pid = posix_getpid();  
                self::log(0, "process($pid) catch signo: $signo");  
                \Thrift\Server\TMultiProcessServer::$catchQuitSignal = $signo;  
                break;  
            default:  
        }  
    }  
  
    /** 
     * 附加信号处理 
     */  
    private function registerSignalHandler()  
    {  
        pcntl_signal(SIGTERM, '\Thrift\Server\TMultiProcessServer::sig_handler');  
        pcntl_signal(SIGHUP, '\Thrift\Server\TMultiProcessServer::sig_handler');  
        pcntl_signal(SIGQUIT, '\Thrift\Server\TMultiProcessServer::sig_handler');  
        pcntl_signal(SIGTSTP, '\Thrift\Server\TMultiProcessServer::sig_handler');  
        declare(ticks=3);  
    }  
  
    /** 
     * 附加守护进程方式 
     */  
    private function daemon()  
    {  
        if (!function_exists('posix_setsid')) {  
            return;  
        }  
        if (($pid1 = pcntl_fork()) != 0) {  
            exit;  
        }  
        posix_setsid();  
        if (($pid2 = pcntl_fork()) != 0) {  
            exit;  
        }  
    }  
  
    public static function log($type, $msg)  
    {  
        static $fds;  
        $msg = date('Y-m-d H:i:s')." $type {$msg}\n";  
        if (isset(self::$logFiles[$type]) && self::$logFiles[$type]) {  
            if (file_exists(self::$logFiles[$type])) {  
                if (empty($fds[$type])) {  
                    $fds[$type] = fopen(self::$logFiles[$type], 'a');  
                }  
                if (!$fds[$type]) {  
                    $fds[$type] = fopen('php://stdout', 'w');  
                    fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n");  
                }  
            } else {  
                if (!is_dir(dirname(self::$logFiles[$type])) && !mkdir(dirname(self::$logFiles[$type]), 0755, true)) {  
                    $fds[$type] = fopen('php://stdout', 'w');  
                    fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING mkdir(".self::$logFiles[$type].") failed\n");  
                } elseif (!($fds[$type] = fopen(self::$logFiles[$type], 'a'))) {  
                    $fds[$type] = fopen('php://stdout', 'w');  
                    fwrite($fds[$type], date('Y-m-d H:i:s')." WARNING fopen(".self::$logFiles[$type].") failed\n");  
                }  
            }  
        } else {  
            $fds[$type] = fopen('php://stdout', 'w');  
        }  
        $ret = fwrite($fds[$type], $msg);  
        if (!$ret && self::$logFiles[$type]) {  
            fclose($fds[$type]);  
            $fds[$type] = fopen(self::$logFiles[$type], 'a');  
            $ret = fwrite($fds[$type], $msg);  
        }  
        return true;  
    }  
  
    public static function createPidFile($pid=0)  
    {  
        if (!$pid) {  
            $pid = posix_getpid();  
        }  
        if (file_exists(self::$pidFile)) {  
            $fd = fopen(self::$pidFile, 'w');  
            if (!$fd) {  
                self::log(1, "fopen(".self::$pidFile.") failed");  
                return false;  
            }  
        } else {  
            if (!is_dir(dirname(self::$pidFile)) && !mkdir(dirname(self::$pidFile), 0755, true)) {  
                self::log(1, "mkdir(".self::$pidFile.") failed");  
                return false;  
            } elseif (!($fd = fopen(self::$pidFile, 'w'))) {  
                self::log(1, "fopen(".self::$pidFile.") failed");  
                return false;  
            }  
        }  
        if (!fwrite($fd, "$pid")) {  
            self::log(1, "fwrite(".self::$pidFile.",$pid) failed");  
            return false;  
        }  
        fclose($fd);  
        return  true;  
    }  
}
网友评论