当前位置 : 主页 > 网络编程 > PHP >

swoole_mq.php

来源:互联网 收集:自由互联 发布时间:2021-06-28
swoole_mq.php serv = new swoole_http_server( "0.0.0.0", "9501" ); $this-serv-set(array( 'worker_num' = 8, //worker number 'max_conn' = 1000, 'max_request' = 100000, 'debug_mode'= 1, 'task_worker_num'=8 //'daemonize' = true, //Is start Daemo
swoole_mq.php
 serv = new swoole_http_server(
            "0.0.0.0",
            "9501"
        );
        $this->serv->set(array(
            'worker_num' => 8,   //worker number
            'max_conn' => 1000,
            'max_request' => 100000,
            'debug_mode'=> 1,
            'task_worker_num'=>8
            //'daemonize' => true, //Is start Daemon process
        ));
        $this->serv->on('Start', array($this, 'onStart'));
        $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
        $this->serv->on('connect', array($this, 'onConnect'));
        $this->serv->on('request' , array( $this , 'onRequest'));
        $this->serv->on('Receive', array($this, 'onReceive'));
        $this->serv->on('Close', array($this, 'onClose'));
        $this->serv->on('Task', array($this, 'onTask'));
        $this->serv->on('Finish', array($this, 'onFinish'));
        $this->serv->start();
    }
    public function onStart( $serv ) {
        echo "Start\n";
    }
    public function onRequest($request, $response){
        $this->serv->task(json_encode($request));
        $response->end("hello swoole rabbitmq");
    }
    public function onConnect($serv, $fd){
        echo "Connected\n";
    }
    public function onWorkerStart( $serv , $worker_id) {
        //Only deal with worker task to connect rabbitmq!!!
        if( $worker_id >= $serv->setting['worker_num'] ) {
           
            #$topic  = '/topic/all_news';
            #$queue  = '/queue/goods/start';
            $queue  = '/queue/goods_start';
            try {
            $serv->mq['conn'] = new Stomp("tcp://localhost:61613");
            #$con->subscribe($topic);
            $serv->mq['conn']->subscribe($queue);
            $msg = "test".time();
            #$con->send($topic, $msg);  
            $serv->mq['conn']->send($queue, $msg);
            echo "Sent message with body '".$msg."'\n";
            } catch(StompException $e) {
            die('Connection failed: ' . $e->getMessage());
            }
            /* read a frame */
            while(true) {
                    $frame = $serv->mq['conn']->readFrame();
                    $db = new mysqli('127.0.0.1', 'ymshop', 'xxxxxx', 'shop_seo');
                    $stmt = $db->prepare("show tables");
                    var_dump($stmt);


                    if ($frame != null) {
                        echo $frame->body;
                        /* acknowledge that the frame was received */
                        $serv->mq['conn']->ack($frame);
                    }
            }//while
        }
    }
    public function onReceive(swoole_server $serv, $fd, $from_id, $data){
        $serv->task("");
    }
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }
    public function onTask($serv,$task_id,$from_id, $data) {
        //send msg to rabbitmq
        $serv->mq['exchange']->publish($data,$this->mq_queue_name);
    }
    public function onFinish($serv,$task_id, $data) {
    }
}
new server();
上一篇:putputPHPExcel.txt
下一篇:创建目录.php
网友评论