当前位置 : 主页 > 大数据 > 区块链 >

twisted使用protobuf的网络通信方案

来源:互联网 收集:自由互联 发布时间:2021-06-22
protobuf,google开源的序列化结构数据的方案,官方提供提供了 C++、Java、Python 三种语言的 API,安装使用方法详见: https://github.com/google/protobuf 。 Python中序列/反序列消息时,有两种实现

protobuf,google开源的序列化结构数据的方案,官方提供提供了 C++、Java、Python 三种语言的 API,安装使用方法详见:https://github.com/google/protobuf

Python中序列/反序列消息时,有两种实现,pure-Python implementation 和 C++ implementation,当然后者性能更高,详见: https://github.com/google/protobuf/tree/master/python。

网络通信中,接收方收到数据后,若要使用protobuf进行反序列化,必须要先知道是什么消息,如何确定收到的数据是什么消息呢?protobuf序列化后的数据未包含数据的长度信息,也无终结符,又要如何区分不同的消息呢?

这两问题,可以通过加固定长度的消息头解决。消息头有两部分,一部分是protobuf序列化后的数据长度(len_pb_data), 另一部分是消息名这个字符串的长度(len_msg_name)。实际发送每条消息的包含 len_pb_data,len_msg_name,msg_name, pb_data这四部分内容,如下图:

具体代码如下,调用send方法发送消息,序列化msg,并在前面加上len_pb_data,len_msg_name, msg_name。

twisted服务器收到消息后,会触发dataReceived处理收到的数据,先解析出pb_data,msg_name的长度,然后解析出msg_name,最后根据msg_name使用protobuf进行反序列化。reactor.callLater(0, _func, self, _request)就是调用相应的消息处理函数。

class PbProtocol(protocol.Protocol, policies.TimeoutMixin):
    BUFFER = ''
    timeOut = 500
    header_format = 'IH'
    header_length = struct.calcsize(header_format)
    def connectionMade(self):
        self.transport.setTcpKeepAlive(True)
        self.setTimeout(self.timeOut)
        peer = self.transport.getPeer()

        print 'Connection made. host, port:', peer.host, peer.port

    def dataReceived(self, data):
        self.resetTimeout()
        self.transport.pauseProducing()
        self.BUFFER += data
        buffer_length = len(self.BUFFER)
        _l = ''
        while (buffer_length >= self.header_length):
            len_pb_data, len_msg_name = struct.unpack(self.header_format, self.BUFFER[:self.header_length])#_bound.ParseFromString(self.BUFFER[:8])
            if len_msg_name:
                if len_msg_name > len(self.BUFFER[self.header_length:]):
                    print 'not enough buffer for msg name, wait for new data coming ...   '
                    break
                else:
                    msg_name = struct.unpack('%ds'% len_msg_name,  self.BUFFER[self.header_length:len_msg_name + self.header_length])[0]
                    _func = getattr(self.factory.service, '%s' % msg_name.lower(), None) 
                    _msg =  getattr(login_pb2, msg_name, None)
                    if _func and _msg:
                        _request = getattr(login_pb2, msg_name)()
                        if len_pb_data <= len(self.BUFFER[self.header_length + len_msg_name :]):
                            _request.ParseFromString(self.BUFFER[self.header_length + len_msg_name : self.header_length + len_msg_name + len_pb_data])
                            reactor.callLater(0, _func, self, _request) 
                            self.BUFFER = self.BUFFER[self.header_length + len_msg_name + len_pb_data:]
                            buffer_length = len(self.BUFFER) 
                            continue
                        else:   
                            print 'not enough buffer for pb_data, waiting for new data coming ... '
                            break
                    else:
                        print 'no such message handler. detail:', _func, hasattr(login_pb2, msg_name), repr(self.BUFFER)
                        if self.fromclient:
                            self.transport.loseConnection()
                        else:
                            self.BUFFER = ''

                        return
            else:
                print 'Un-supported message, no msg_name. detail:', len_msg_name
                if self.fromclient:
                    self.transport.loseConnection()
                else:
                    self.BUFFER = ''
                return
            
        self.transport.resumeProducing()
        

    def send(self, msg):
        if msg:
            pb_data = msg.SerializeToString()
            _header = struct.pack(self.header_format + '%ds'%len(msg.__class__.__name__), len(pb_data), len(msg.__class__.__name__), msg.__class__.__name__)
            self.transport.write(_header + pb_data)

以上就是twisted中protobuf的实现,完整源码详见:https://github.com/mxins/twisted-protobuf

loginserver是twisted实现的登陆服务器,robot是登陆机器人,base.py实现了PbProtocol类,login.proto是消息格式定义。

网友评论