swoole_mq.php serv = new swoole_http_server( "0.0.0.0", "9501" ); $this-serv-set(array( 'worker_num' = 8, //worker number 'max_conn' = 1000, 'max_request' = 100000, 'debug_mode'= 1, 'task_worker_num'=8 //'daemonize' = true, //Is start Daemo
serv = new swoole_http_server(
"0.0.0.0",
"9501"
);
$this->serv->set(array(
'worker_num' => 8, //worker number
'max_conn' => 1000,
'max_request' => 100000,
'debug_mode'=> 1,
'task_worker_num'=>8
//'daemonize' => true, //Is start Daemon process
));
$this->serv->on('Start', array($this, 'onStart'));
$this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
$this->serv->on('connect', array($this, 'onConnect'));
$this->serv->on('request' , array( $this , 'onRequest'));
$this->serv->on('Receive', array($this, 'onReceive'));
$this->serv->on('Close', array($this, 'onClose'));
$this->serv->on('Task', array($this, 'onTask'));
$this->serv->on('Finish', array($this, 'onFinish'));
$this->serv->start();
}
public function onStart( $serv ) {
echo "Start\n";
}
public function onRequest($request, $response){
$this->serv->task(json_encode($request));
$response->end("hello swoole rabbitmq");
}
public function onConnect($serv, $fd){
echo "Connected\n";
}
public function onWorkerStart( $serv , $worker_id) {
//Only deal with worker task to connect rabbitmq!!!
if( $worker_id >= $serv->setting['worker_num'] ) {
#$topic = '/topic/all_news';
#$queue = '/queue/goods/start';
$queue = '/queue/goods_start';
try {
$serv->mq['conn'] = new Stomp("tcp://localhost:61613");
#$con->subscribe($topic);
$serv->mq['conn']->subscribe($queue);
$msg = "test".time();
#$con->send($topic, $msg);
$serv->mq['conn']->send($queue, $msg);
echo "Sent message with body '".$msg."'\n";
} catch(StompException $e) {
die('Connection failed: ' . $e->getMessage());
}
/* read a frame */
while(true) {
$frame = $serv->mq['conn']->readFrame();
$db = new mysqli('127.0.0.1', 'ymshop', 'xxxxxx', 'shop_seo');
$stmt = $db->prepare("show tables");
var_dump($stmt);
if ($frame != null) {
echo $frame->body;
/* acknowledge that the frame was received */
$serv->mq['conn']->ack($frame);
}
}//while
}
}
public function onReceive(swoole_server $serv, $fd, $from_id, $data){
$serv->task("");
}
public function onClose( $serv, $fd, $from_id ) {
echo "Client {$fd} close connection\n";
}
public function onTask($serv,$task_id,$from_id, $data) {
//send msg to rabbitmq
$serv->mq['exchange']->publish($data,$this->mq_queue_name);
}
public function onFinish($serv,$task_id, $data) {
}
}
new server();
