当前位置 : 主页 > 网络编程 > PHP >

swoole 队列消费(一个生产者,多个消费者)

来源:互联网 收集:自由互联 发布时间:2021-06-28
swoole 多进程队列消费,使用非阻塞模式,生产者结束,发送消息给消费者,消费者传递结束消息给其他消费者 push($data); echo ' push data_'.$data."\n"; usleep(1000); } // 消息生产结束 $worker-push
swoole 多进程队列消费,使用非阻塞模式,生产者结束,发送消息给消费者,消费者传递结束消息给其他消费者
 push($data);
        echo '< push data_'.$data."\n";
        usleep(1000);
    }
    // 消息生产结束
    $worker->push('exit');

    $worker->exit(0);
});
$producer->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT);
$processCollect[$producer->start()] = 'product';

// 多个消费者
for ($i = 0; $i < $max_precess; ++$i) {
    $process = new swoole_process(function (swoole_process $worker) use ($process, $i): void {
        swoole_set_process_name('test process_'.$i);
        echo 'test process_'.$i.'_'.posix_getpid()." start\n";

        while (true) {
            $recv = $worker->pop();
            if ($recv == false) {
                continue;
            }
            if ($recv == 'exit') {
                // 传递消息生产结束给其他消费者
                $worker->push('exit');
                break;
            }
            echo '> process_'.$i.'_'.posix_getpid().' pull '.$recv."\n";
        }

        $worker->exit(0);
    });
    $process->useQueue($queueName, 2 | swoole_process::IPC_NOWAIT);
    $processCollect[$process->start()] = $i;
}

while (0 != count($processCollect)) {
    $ret = swoole_process::wait();

    if ($ret !== false) {
        echo '#'.$processCollect[$ret['pid']].' '.$ret['pid']." exit\n";
        unset($processCollect[$ret['pid']]);
    }
}
$producer->freeQueue();
网友评论