当前位置 : 主页 > 网络编程 > PHP >

简单多进程任务处理程序

来源:互联网 收集:自由互联 发布时间:2021-06-28
简单多进程任务处理程序 worker = $worker;$this-maxProcess = max(2, (int)$maxProcess);$this-dynamicParam = max(0, (int)$dynamicParam);}/** * fork子进程处理数据 * @param Array $data 需要处理的数据,必须是数组 */p
简单多进程任务处理程序
 worker = $worker;
		$this->maxProcess = max(2, (int)$maxProcess);
		$this->dynamicParam = max(0, (int)$dynamicParam);
	}
	
	/**
	 * fork子进程处理数据
	 * @param Array $data 需要处理的数据,必须是数组
	 */
	public function run(&$data) {
		$count = count($data);
		
		// 需要开启的子进程数
		$num = $this->dynamicParam ? min( $this->maxProcess, ceil($count / $this->dynamicParam) ) : $this->maxProcess;
		
		// 每个进程处理的数据量
		$n = ceil($count / $num); 
		
		$childs = array();
		
		for($i = 0; $i < $count; $i += $n) {
			$pid = pcntl_fork();
			if($pid == -1) {
				echo "Fork worker failed!";
				return false;
			}
			
			if($pid) {
				echo "Fork worker success! pid:",  $pid, "\n";
				$childs[] = $pid;
			} else {
				$sliceData = array_slice($data, $i, $n);
	
				$this->worker->runWorker($sliceData);
				exit();
			}
		}
		
		$this->check($childs);
	}
	
	/**
	 * 检测子进程状态,监控子进程是否退出,并防止僵尸进程
	 */
	protected function check($childs) {
		while(true) {
			foreach($childs as $index => $pid) {
				$pid && $res = pcntl_waitpid($pid, $status, WNOHANG);
				if(!$pid || $res == -1) {
					echo "End worker: $pid \n";
					unset($childs[$index]);
				}
			}
			
			if(empty($childs)) break;
			sleep(1);
		}
	}
}

/**
 * 使用示例
 */
class Test {
	public function run() {
		$data = array_fill(0, 800, 1);
		
		// 开8个进程将 $data 分成8份,交由下面的 runWorker 方法处理
		$task = new TaskHelper($this, 8);
		
		$task->run($data); // 如果前面连接了数据库、redis等,最好在这之前关闭掉
	}
	
	/**
	 * 这里编写代相应码来处理数据
	 */
	public function runWorker($data) {
		// do something...
	}
}

$obj = new Test;
$obj->run();
上一篇:Dynamic cache
下一篇:Captcha.php
网友评论