protobuf,google开源的序列化结构数据的方案,官方提供提供了 C++、Java、Python 三种语言的 API,安装使用方法详见:https://github.com/google/protobuf。
Python中序列/反序列消息时,有两种实现,pure-Python implementation 和 C++ implementation,当然后者性能更高,详见: https://github.com/google/protobuf/tree/master/python。
网络通信中,接收方收到数据后,若要使用protobuf进行反序列化,必须要先知道是什么消息,如何确定收到的数据是什么消息呢?protobuf序列化后的数据未包含数据的长度信息,也无终结符,又要如何区分不同的消息呢?
这两问题,可以通过加固定长度的消息头解决。消息头有两部分,一部分是protobuf序列化后的数据长度(len_pb_data), 另一部分是消息名这个字符串的长度(len_msg_name)。实际发送每条消息的包含 len_pb_data,len_msg_name,msg_name, pb_data这四部分内容,如下图:
具体代码如下,调用send方法发送消息,序列化msg,并在前面加上len_pb_data,len_msg_name, msg_name。
twisted服务器收到消息后,会触发dataReceived处理收到的数据,先解析出pb_data,msg_name的长度,然后解析出msg_name,最后根据msg_name使用protobuf进行反序列化。reactor.callLater(0, _func, self, _request)就是调用相应的消息处理函数。
class PbProtocol(protocol.Protocol, policies.TimeoutMixin): BUFFER = '' timeOut = 500 header_format = 'IH' header_length = struct.calcsize(header_format) def connectionMade(self): self.transport.setTcpKeepAlive(True) self.setTimeout(self.timeOut) peer = self.transport.getPeer() print 'Connection made. host, port:', peer.host, peer.port def dataReceived(self, data): self.resetTimeout() self.transport.pauseProducing() self.BUFFER += data buffer_length = len(self.BUFFER) _l = '' while (buffer_length >= self.header_length): len_pb_data, len_msg_name = struct.unpack(self.header_format, self.BUFFER[:self.header_length])#_bound.ParseFromString(self.BUFFER[:8]) if len_msg_name: if len_msg_name > len(self.BUFFER[self.header_length:]): print 'not enough buffer for msg name, wait for new data coming ... ' break else: msg_name = struct.unpack('%ds'% len_msg_name, self.BUFFER[self.header_length:len_msg_name + self.header_length])[0] _func = getattr(self.factory.service, '%s' % msg_name.lower(), None) _msg = getattr(login_pb2, msg_name, None) if _func and _msg: _request = getattr(login_pb2, msg_name)() if len_pb_data <= len(self.BUFFER[self.header_length + len_msg_name :]): _request.ParseFromString(self.BUFFER[self.header_length + len_msg_name : self.header_length + len_msg_name + len_pb_data]) reactor.callLater(0, _func, self, _request) self.BUFFER = self.BUFFER[self.header_length + len_msg_name + len_pb_data:] buffer_length = len(self.BUFFER) continue else: print 'not enough buffer for pb_data, waiting for new data coming ... ' break else: print 'no such message handler. detail:', _func, hasattr(login_pb2, msg_name), repr(self.BUFFER) if self.fromclient: self.transport.loseConnection() else: self.BUFFER = '' return else: print 'Un-supported message, no msg_name. detail:', len_msg_name if self.fromclient: self.transport.loseConnection() else: self.BUFFER = '' return self.transport.resumeProducing() def send(self, msg): if msg: pb_data = msg.SerializeToString() _header = struct.pack(self.header_format + '%ds'%len(msg.__class__.__name__), len(pb_data), len(msg.__class__.__name__), msg.__class__.__name__) self.transport.write(_header + pb_data)
以上就是twisted中protobuf的实现,完整源码详见:https://github.com/mxins/twisted-protobuf
loginserver是twisted实现的登陆服务器,robot是登陆机器人,base.py实现了PbProtocol类,login.proto是消息格式定义。