简单多进程任务处理程序 worker = $worker;$this-maxProcess = max(2, (int)$maxProcess);$this-dynamicParam = max(0, (int)$dynamicParam);}/** * fork子进程处理数据 * @param Array $data 需要处理的数据,必须是数组 */p
worker = $worker; $this->maxProcess = max(2, (int)$maxProcess); $this->dynamicParam = max(0, (int)$dynamicParam); } /** * fork子进程处理数据 * @param Array $data 需要处理的数据,必须是数组 */ public function run(&$data) { $count = count($data); // 需要开启的子进程数 $num = $this->dynamicParam ? min( $this->maxProcess, ceil($count / $this->dynamicParam) ) : $this->maxProcess; // 每个进程处理的数据量 $n = ceil($count / $num); $childs = array(); for($i = 0; $i < $count; $i += $n) { $pid = pcntl_fork(); if($pid == -1) { echo "Fork worker failed!"; return false; } if($pid) { echo "Fork worker success! pid:", $pid, "\n"; $childs[] = $pid; } else { $sliceData = array_slice($data, $i, $n); $this->worker->runWorker($sliceData); exit(); } } $this->check($childs); } /** * 检测子进程状态,监控子进程是否退出,并防止僵尸进程 */ protected function check($childs) { while(true) { foreach($childs as $index => $pid) { $pid && $res = pcntl_waitpid($pid, $status, WNOHANG); if(!$pid || $res == -1) { echo "End worker: $pid \n"; unset($childs[$index]); } } if(empty($childs)) break; sleep(1); } } } /** * 使用示例 */ class Test { public function run() { $data = array_fill(0, 800, 1); // 开8个进程将 $data 分成8份,交由下面的 runWorker 方法处理 $task = new TaskHelper($this, 8); $task->run($data); // 如果前面连接了数据库、redis等,最好在这之前关闭掉 } /** * 这里编写代相应码来处理数据 */ public function runWorker($data) { // do something... } } $obj = new Test; $obj->run();