当前位置 : 主页 > 网络编程 > PHP >

php消息队列实现详解

来源:互联网 收集:自由互联 发布时间:2023-01-31
目录 常见进程通信方式 系统V IPC消息队列 php创建一个消息队列 读取队列内容 关闭序列化功能 非阻塞读取消息队列 常见进程通信方式 System V IPC 总的包括:消息队列,共享内存、信号
目录
  • 常见进程通信方式
  • 系统V IPC消息队列
  • php创建一个消息队列
    • 读取队列内容
    • 关闭序列化功能
    • 非阻塞读取消息队列

常见进程通信方式

System V IPC 总的包括:消息队列,共享内存、信号量。

IPC(内部进程间通信)的使用注意:

  • IPC资源仅在本机中使用,不能够跨网络使用(其实进程间通信方式除了socket 通信方式其他都是仅在本机中使用)。
  • IPC的资源生存周期与内核相同。除非删除,不然会与系统的生存周期相同。(也就是说如果你不主动删除创建的ipc资源,那么它会一直存在,除非系统关机,它才会被清除)
  • (重要)每个IPC都有一个关键字key。每个IPC资源都有唯一的整型标识符,进程可以使用id对此资源进行访问。

系统V IPC消息队列

消息队列实际上是一个队列,由内核维护,使用msgget (msgget() 函数是linux系统调用函数,由c语言编写) 函数来创建一个消息队列,创建成功返回队列 ID

在php 中通过封装 system V IPC 函数来实现操作消息队列,共享内存,与信号量。我们可以通过php官方文档上查看这些函数的定义,其中msg_ 为首的函数是用来操作消息队列,sem_ 为首的函数是对信号量操作,shm_ 为首的函数是对共享内存的操作

其实 php 这些进程扩展说到底就是对 linux c 系统调用函数与c标准函数库的封装,然后在加一点php语言自己的处理,封装成新的函数供 广大 phper 调用,只要在linux 下运行 不管你是 GO, JAVA ,python 其实都是对系统调用函数的封装,只是封装后的名字不同,但是调用的都是linux 内核提供的同一套接口

php创建一个消息队列

<?php
$key = ftok('demo1.php',"x");//将文件与ID转换为一个key,这个文件只要真实存在就行,没有特殊限制,一般都指定当前文件名
$msqid = msg_get_queue($key);// 创建一个消息队列,返回一个资源id
echo msg_send($msqid,1,"hello"); //往消息队列写入一条数据 hello 到队列中
echo $msqid; // 输出资源id

如何查看创建的消息队列呢?

我们可用通过linux 提供的 ipcs 命令查看

如图,第一列 Message Queues 代表消息队列

keymsqidownerpermsused-bytesmessages0x7801620f5root666121外部标识由调用函数时传入消息队列标识id(用于进程内部通讯)每个队列都有唯一标识,用于区分不同消息队列创建消息队列用户操作权限位消息队列使用了多少字节消息队列有多少条消息

通过表格分析我们刚刚创建的消息队列是一个,消息标识符为5 创建用户为root ,操作权限为666,写入了一条数据,占用了12个字节的队列。

看到这,大家可能发现两个问题?

第一个问题:

为什么我调用 php 函数 msg_get_queue() 创建消息队列返回的不是消息队列标识符5,而是 Resource id #4

因为之前说过 php 函数其实是对linux c 系统调用函数的封装,也就是说 msg_get_queue 函数 其实是对 msgget 系统调用函数的封装,调用msgget 函数的返回内容应该是 消息队列标识id,而现在封装 msgget 函数的 msg_get_queue 函数返回的却是资源描述符,这是为什么?

这其实就是php内部做了处理,返回资源描述符,使得 msg_get_queue 可以配合其他php

函数一起使用,但内部调用msgget 函数返回的一定是 5,也就是消息队列标识符

如果不相信可使用 linux 下提供的 strace 工具 跟踪系统调用。

第二个问题

通过php函数 msg_send 写入了一条数据,数据内容为 ”hello“ 字节长度应该是5个字节,但是消息队列占用的字节长度缺少12个字节?

这是因为php 内部实现对写入数据做了序列化操作,导致写入消息队列的字节长度为12而不是5个字节长度

<?php
$a = serialize("hello");
echo 'hello 序列化长度:'.strlen($a)."\n";
echo 'hello 序列化内容:'.$a."\n";
echo "hello 未序列化长度:".strlen("hello")."\n";

当然如果你不想 msg_send 函数写入队列前对数据进行序列化,可以把第3个参数设置为false,不过需要注意的是,如果写入数据不进行序列化,那么使用 msg_receive 函数读取队列数据时也必须设置为不反序列化操作,不然会引发错误

读取队列内容

<?php
$key = ftok('demo1.php',"x");//将文件与ID转换为一个key
$msqid = msg_get_queue($key);//消息队列如果已创建,直接返回同一个资源描述符
//第一个参数资源描述符,第二个参数一般为0,表示从队列第一条读取,或者设置与msg_send函数第二个参数一致,第三个参数设置缓冲区长度,注意msg_send 函数默认会序列化数据,msg_receive函数默认会将读取数据反序列化,如果缓冲区设置过小会反序列化失败
msg_receive($msqid,1,$received_message_type,1024,$message);//读取消息队列数据,逐条读取,
echo $message.PHP_EOL;

成功从消息队列中读取出一条数据,内容hello

因为消息队列内容被读取,消息队列占用字节被清空为0,消息队列消息条数也为0

通过消息队列的读写操作我们发现,他与上一篇的管道有什么区别?

消息队列不像管道通信写入数据,必须读端要打开,读数据,写端必须要打开,不然会阻塞无法通信,消息队列可以随时往队列写入数据,读取数据

关闭序列化功能

写端

<?php
$key = ftok('demo3.php',"x");//将文件与ID转换为一个key,这个文件只要真实存在就行,没有特殊限制,一般都指定当前文件名
$msqid = msg_get_queue($key);// 创建一个消息队列,返回一个资源id
// 第一个参数资源id,第二个参数消息类型,必须大于0,第三个参数写入队列内容,取消序列化操作
echo msg_send($msqid,1,"hello",false); //往消息队列写入一条数据 hello 到队列中

因为没有对写入数据进行序列化操作,消息队列占用的字节长度变成了5个字节。

读端

<?php
$key = ftok('demo3.php',"x");//将文件与ID转换为一个key
$msqid = msg_get_queue($key);
//第一个参数资源描述符
//第二个参数一般为0,表示从队列第一条读取,或者设置与msg_send函数第二个参数一致
// 第三个参数是msg_send函数的第二个参数的值,也就是发送消息的消息类型
//第四个参数设置接收缓冲区长度,如果设置过小会反序列化失败【如果使用序列化功能时候】
//当接收缓冲区大小设置过小并且关闭了反序列化【发送端发送数据也关闭了】,但是发送的数据超过了设置的长度,就会被截断,并且额外的数据会被丢弃,必须设置第五个参数才有效
// 第五个参数 flags 标志符
msg_receive($msqid,0,$received_message_type,1,$message,false,MSG_NOERROR);
echo $message.PHP_EOL;

只读取到一个h​ 是因为接受缓冲区长度设置为1个字节,所以只接收到一个字节,其它部分被丢弃。

非阻塞读取消息队列

当队列内没有数据,读取队列msg_receive函数是会阻塞的如何让队列无数据读端也不阻塞继续往下执行,可以通过 设置 flags 标志符 为 MSG_IPC_NOWAIT,该标志符作用是,不管队列有无数据立即返回结果

<?php
$key = ftok('demo5.php',"x");//将文件与ID转换为一个key
$msqid = msg_get_queue($key);
// fork 一个子进程
$pid = pcntl_fork();
if($pid == 0){
    while(1){
    // MSG_IPC_NOWAIT【启用非阻塞】 msg_receive 函数其实是封装 linux c msgrcv 系统调用函数,如果设置为非阻塞,函数调用次数非常高,会非常消耗cpu资源
    //阻塞模式调用函数次数低,有数据才返回,对cpu友好
        $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code);
        if($error_code != MSG_ENOMSG){
            echo $msg."\n";
        }
        sleep(2);
        echo "go"."\n";
    }
    exit(0);
}

父子进程消息队列通信

<?php
$key = ftok('demo5.php',"x");//将文件与ID转换为一个key
$msqid = msg_get_queue($key);
$pid = pcntl_fork();
if($pid == 0){
    while(1){
        $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code);
        if($error_code != MSG_ENOMSG){
            echo $msg."\n";
        }
    }
    exit(0);
}
$i = 1;
while(1){
    msg_send($msqid,2,"hello world",true);
    sleep(2);
    if($i++ == 3){
    	// 杀死子进程
        posix_kill($pid,SIGILL);
        break;
    }
}
//子进程退出回收子进程,防止孤儿进程
$pid = pcntl_wait($status);
if($pid > 0)
{
    echo "exit pid=".$pid."\n";
}

删除消息队列

<?php
$key = ftok('demo5.php',"x");//将文件与ID转换为一个key
$msqid = msg_get_queue($key);
$pid = pcntl_fork();
if($pid == 0){
    while(1){
        $ret = msg_receive($msqid,0,$msgType,1024,$msg,true,MSG_IPC_NOWAIT,$error_code);
        if($error_code != MSG_ENOMSG){
            echo $msg."\n";
        }

    }
    exit(0);
}
$i = 1;
while(1){
    msg_send($msqid,2,"hello world",true);
    sleep(2);
    if($i++ == 3){
    	// 杀死子进程
        posix_kill($pid,SIGILL);
        break;
    }
}
//子进程退出回收子进程,防止孤儿进程
$pid = pcntl_wait($status);
if($pid > 0)
{
    echo "exit pid=".$pid."\n";
}
//msg_remove_queue 函数移除消息队列
if(msg_remove_queue($msqid)){
    echo "remove ok\n";
}

读取3次队列数据后父进程杀死子进程,父进程回收子进程,然后父进程退出,最后销毁消息队列

删除消息队列也可以直接使用命令 ipcrm -q 7

到此这篇关于php消息队列实现详解的文章就介绍到这了,更多相关php消息队列内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

上一篇:PHP垃圾回收机制超详细介绍
下一篇:没有了
网友评论