目标是使用具有以下类型签名的管道 protobufConduit :: MonadResource m = (ByteString - a) - Conduit ByteString m a 管道应该重复解析通过TCP / IP(使用network-conduit包)接收的协议缓冲区(使用ByteString – 一个
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
管道应该重复解析通过TCP / IP(使用network-conduit包)接收的协议缓冲区(使用ByteString – >一个函数).
有线消息格式是
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(花括号不是协议的一方,仅用于分隔实体).
第一个想法是使用sequenceSink重复应用一个能够解析一个ProtoBuf的Sink:
[...] import qualified Data.Binary as B import qualified Data.Conduit.Binary as CB import qualified Data.Conduit.Util as CU protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a protobufConduit protobufDecode = CU.sequenceSink () $\() -> do lenBytes <- CB.take 4 -- read protobuf length let len :: Word32 len = B.decode lengthBytes -- decode ProtoBuf length intLen = fromIntegral len protobufBytes <- CB.take intLen -- read the ProtoBuf bytes return $CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
它不起作用(仅适用于第一个协议缓冲区),因为似乎已经从源读取了许多“剩余”字节,但没有通过CB.take消耗掉这些字节.
我发现无法将“其余部分推回源头”.
我的概念完全错了吗?
PS:即使我在这里使用协议缓冲区,问题也与协议缓冲区无关.为了调试这个问题,我总是使用{length} {UTF8编码的字符串} {length} {UTF8编码的字符串} …和类似于上面的管道(utf8StringConduit :: MonadResource m => Conduit ByteString m Text).
更新:
我只是尝试用剩余的字节替换状态(上面的示例中没有state())并通过调用一个首先消耗已经读取的字节(来自状态)的函数来替换CB.take调用,并且仅调用await作为需要(当状态不够大时).不幸的是,这不起作用,因为只要Source没有剩余字节,sequenceSink就不会执行代码,但状态仍然包含剩余的字节:-(.
如果你应该对代码感兴趣(没有优化或非常好,但应该足以测试):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text utf8StringConduit = CU.sequenceSink [] $\st -> do (lengthBytes, st') <- takeWithState BS.empty st 4 let len :: Word32 len = B.decode $BSL.fromChunks [lengthBytes] intLength = fromIntegral len (textBytes, st'') <- takeWithState BS.empty st' intLength return $CU.Emit st'' [ TE.decodeUtf8 $textBytes ] takeWithState :: Monad m => ByteString -> [ByteString] -> Int -> Pipe l ByteString o u m (ByteString, [ByteString]) takeWithState acc state 0 = return (acc, state) takeWithState acc state neededLen = let stateLenSum = foldl' (+) 0 $map BS.length state in if stateLenSum >= neededLen then do let (firstChunk:state') = state (neededChunk, pushBack) = BS.splitAt neededLen firstChunk acc' = acc `BS.append` neededChunk neededLen' = neededLen - BS.length neededChunk state'' = if BS.null pushBack then state' else pushBack:state' takeWithState acc' state'' neededLen' else do aM <- await case aM of Just a -> takeWithState acc (state ++ [a]) neededLen Nothing -> error "to be fixed later"对于协议缓冲区解析和序列化,我们使用messageWithLengthPutM和messageWithLengthGetM(见下文),但我认为它使用了长度的varint编码,这不是你需要的.我可能会尝试通过用类似的东西替换messageWithLength Get / Put来调整我们的实现
myMessageWithLengthGetM = do size <- getWord32be getMessageWithSize size
但是我不知道如何使用协议缓冲包中的可用函数来实现getMessageWithSize.另一方面,你可以getByteString,然后“重新分析”bytestring.
关于管道:您是否尝试过在没有Data.Conduit.Util的情况下实施管道?就像是
protobufConduit protobufDecode = loop where loop = do len <- liftM convertLen (CB.take 4) bs <- CB.take len yield (protobufDecode bs) loop
这是我们使用的代码:
pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString pbufSerialize = awaitForever f where f pb = M.mapM_ yield $BSL.toChunks $runPut (messageWithLengthPutM pb) pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w pbufParse = new where new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[])) read parse = do mbs <- await case mbs of Just bs -> checkResult (parse bs) Nothing -> return () checkResult result = case result of Failed _ errmsg -> fail errmsg Partial cont -> read (cont . Just . BSL.fromChunks . (:[])) Finished rest _ msg -> do yield msg checkResult (runGet messageWithLengthGetM rest)