swoole 多进程队列消费,使用非阻塞模式,生产者结束,发送消息给消费者,消费者传递结束消息给其他消费者 push($data); echo ' push data_'.$data."\n"; usleep(1000); } // 消息生产结束 $worker-push
push($data);
echo '< push data_'.$data."\n";
usleep(1000);
}
// 消息生产结束
$worker->push('exit');
$worker->exit(0);
});
$producer->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT);
$processCollect[$producer->start()] = 'product';
// 多个消费者
for ($i = 0; $i < $max_precess; ++$i) {
$process = new swoole_process(function (swoole_process $worker) use ($process, $i): void {
swoole_set_process_name('test process_'.$i);
echo 'test process_'.$i.'_'.posix_getpid()." start\n";
while (true) {
$recv = $worker->pop();
if ($recv == false) {
continue;
}
if ($recv == 'exit') {
// 传递消息生产结束给其他消费者
$worker->push('exit');
break;
}
echo '> process_'.$i.'_'.posix_getpid().' pull '.$recv."\n";
}
$worker->exit(0);
});
$process->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT);
$processCollect[$process->start()] = $i;
}
while (0 != count($processCollect)) {
$ret = swoole_process::wait();
if ($ret !== false) {
echo '#'.$processCollect[$ret['pid']].' '.$ret['pid']." exit\n";
unset($processCollect[$ret['pid']]);
}
}
$producer->freeQueue();
