我想使用Control.Concurrent.Async mapConcurrently来使用http-conduit执行并行下载.解决方案 here对我的情况来说是不够的,因为我想处理n个任务,但是将并发工作者的数量限制为m,其中m 最佳答案 也许
withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b) withConc sem f = \a -> Concurrently (bracket_ (waitQSem sem) (signalQSem sem) (f a))
我们可以将withConc与traverse结合使用来执行任何Traversable容器任务的限制并发遍历:
traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b] traverseThrottled concLevel action tasks = do sem <- newQSem concLevel runConcurrently (traverse (withConc sem action) tasks)
这种方法的一个缺点是使用Concurrently会创建与任务一样多的线程,并且由于信号量的原因,它们中只有一部分将在任何给定时间执行实际工作.
另一方面,Haskell中的线程很便宜,所以我认为在任务数量不是很大的情况下,这是一个可以接受的解决方案.
编辑:给traverseThrottled一个更一般的签名:
import Data.Traversable import Control.Concurrent import Control.Concurrent.Async import Control.Exception traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b) traverseThrottled concLevel action taskContainer = do sem <- newQSem concLevel let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action runConcurrently (traverse (Concurrently . throttledAction) taskContainer)
相关文章
- 在Haskell中运行并行URL下载
- 使用Python中的工作池进行异步多处理:如何在超时后继续运行?
- 多线程 - Haskell:次优并行GC工作平衡,并行执行没有加速
- Emacs在haskell模式下挂起,并且进行了低级haskell加载文件调用
- 性能 - 如何在不使用Jobs的情况下并行运行PowerShell脚本?
- 如何使用splat运算符进行并行赋值在Ruby中工作?
- 多线程 - 使用forkIO在Haskell中进行Network.Socket编程“并发,异步,并行,非阻塞”
- 在应用程序上下文中运行Celery工作程序仍然会在任务中引发“在应用程序上下文之外工作”错误
转载注明原文:在Haskell中使用工作池运行并行URL下载 - 代码日志
withConc :: QSem -> (a -> IO b) -> (a -> Concurrently b) withConc sem f = \a -> Concurrently (bracket_ (waitQSem sem) (signalQSem sem) (f a))
我们可以将withConc与traverse结合使用来执行任何Traversable容器任务的限制并发遍历:
traverseThrottled :: Int -> (a -> IO b) -> [a] -> IO [b] traverseThrottled concLevel action tasks = do sem <- newQSem concLevel runConcurrently (traverse (withConc sem action) tasks)
这种方法的一个缺点是使用Concurrently会创建与任务一样多的线程,并且由于信号量的原因,它们中只有一部分将在任何给定时间执行实际工作.
另一方面,Haskell中的线程很便宜,所以我认为在任务数量不是很大的情况下,这是一个可以接受的解决方案.
编辑:给traverseThrottled一个更一般的签名:
import Data.Traversable import Control.Concurrent import Control.Concurrent.Async import Control.Exception traverseThrottled :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b) traverseThrottled concLevel action taskContainer = do sem <- newQSem concLevel let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action runConcurrently (traverse (Concurrently . throttledAction) taskContainer)