swoole 多进程队列消费,使用非阻塞模式,生产者结束,发送消息给消费者,消费者传递结束消息给其他消费者 push($data); echo ' push data_'.$data."\n"; usleep(1000); } // 消息生产结束 $worker-push
push($data); echo '< push data_'.$data."\n"; usleep(1000); } // 消息生产结束 $worker->push('exit'); $worker->exit(0); }); $producer->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT); $processCollect[$producer->start()] = 'product'; // 多个消费者 for ($i = 0; $i < $max_precess; ++$i) { $process = new swoole_process(function (swoole_process $worker) use ($process, $i): void { swoole_set_process_name('test process_'.$i); echo 'test process_'.$i.'_'.posix_getpid()." start\n"; while (true) { $recv = $worker->pop(); if ($recv == false) { continue; } if ($recv == 'exit') { // 传递消息生产结束给其他消费者 $worker->push('exit'); break; } echo '> process_'.$i.'_'.posix_getpid().' pull '.$recv."\n"; } $worker->exit(0); }); $process->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT); $processCollect[$process->start()] = $i; } while (0 != count($processCollect)) { $ret = swoole_process::wait(); if ($ret !== false) { echo '#'.$processCollect[$ret['pid']].' '.$ret['pid']." exit\n"; unset($processCollect[$ret['pid']]); } } $producer->freeQueue();