简单多进程任务处理程序 worker = $worker;$this-maxProcess = max(2, (int)$maxProcess);$this-dynamicParam = max(0, (int)$dynamicParam);}/** * fork子进程处理数据 * @param Array $data 需要处理的数据,必须是数组 */p
worker = $worker;
$this->maxProcess = max(2, (int)$maxProcess);
$this->dynamicParam = max(0, (int)$dynamicParam);
}
/**
* fork子进程处理数据
* @param Array $data 需要处理的数据,必须是数组
*/
public function run(&$data) {
$count = count($data);
// 需要开启的子进程数
$num = $this->dynamicParam ? min( $this->maxProcess, ceil($count / $this->dynamicParam) ) : $this->maxProcess;
// 每个进程处理的数据量
$n = ceil($count / $num);
$childs = array();
for($i = 0; $i < $count; $i += $n) {
$pid = pcntl_fork();
if($pid == -1) {
echo "Fork worker failed!";
return false;
}
if($pid) {
echo "Fork worker success! pid:", $pid, "\n";
$childs[] = $pid;
} else {
$sliceData = array_slice($data, $i, $n);
$this->worker->runWorker($sliceData);
exit();
}
}
$this->check($childs);
}
/**
* 检测子进程状态,监控子进程是否退出,并防止僵尸进程
*/
protected function check($childs) {
while(true) {
foreach($childs as $index => $pid) {
$pid && $res = pcntl_waitpid($pid, $status, WNOHANG);
if(!$pid || $res == -1) {
echo "End worker: $pid \n";
unset($childs[$index]);
}
}
if(empty($childs)) break;
sleep(1);
}
}
}
/**
* 使用示例
*/
class Test {
public function run() {
$data = array_fill(0, 800, 1);
// 开8个进程将 $data 分成8份,交由下面的 runWorker 方法处理
$task = new TaskHelper($this, 8);
$task->run($data); // 如果前面连接了数据库、redis等,最好在这之前关闭掉
}
/**
* 这里编写代相应码来处理数据
*/
public function runWorker($data) {
// do something...
}
}
$obj = new Test;
$obj->run();
