swoole 多进程队列消费,使用非阻塞模式,生产者结束,发送消息给消费者,消费者传递结束消息给其他消费者 push($data); echo ' push data_'.$data."\n"; usleep(1000); } // 消息生产结束 $worker-push
          
 push($data);
        echo '< push data_'.$data."\n";
        usleep(1000);
    }
    // 消息生产结束
    $worker->push('exit');
    $worker->exit(0);
});
$producer->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT);
$processCollect[$producer->start()] = 'product';
// 多个消费者
for ($i = 0; $i < $max_precess; ++$i) {
    $process = new swoole_process(function (swoole_process $worker) use ($process, $i): void {
        swoole_set_process_name('test process_'.$i);
        echo 'test process_'.$i.'_'.posix_getpid()." start\n";
        while (true) {
            $recv = $worker->pop();
            if ($recv == false) {
                continue;
            }
            if ($recv == 'exit') {
                // 传递消息生产结束给其他消费者
                $worker->push('exit');
                break;
            }
            echo '> process_'.$i.'_'.posix_getpid().' pull '.$recv."\n";
        }
        $worker->exit(0);
    });
    $process->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT);
    $processCollect[$process->start()] = $i;
}
while (0 != count($processCollect)) {
    $ret = swoole_process::wait();
    if ($ret !== false) {
        echo '#'.$processCollect[$ret['pid']].' '.$ret['pid']." exit\n";
        unset($processCollect[$ret['pid']]);
    }
}
$producer->freeQueue();
        
        