在我正在处理的应用程序中,用户可以执行“转换”,其中包含“步骤”.步骤可以对其他步骤具有任意数量的依赖性.我希望能够调用转换并将这些步骤作为单独的Celery任务并行执行. 理想
理想情况下,我喜欢celery-tasktree行,除了一般的有向非循环图,而不仅仅是树,但似乎还没有这样的库存在.
首先想到的解决方案是对标准拓扑排序的并行调整 – 而不是确定满足依赖关系的步骤的线性排序,我们确定可以在开始时并行执行的整个步骤集,然后是可以在第2轮中执行的整个步骤集,依此类推.
但是,当任务占用不同的时间并且工作人员必须空闲等待较长时间运行的任务时,这不是最佳的,而现在有任务可以运行. (对于我的具体应用,此解决方案现在可能还不错,但我仍然想弄清楚如何优化它.)
如https://cs.stackexchange.com/questions/2524/getting-parallel-items-in-dependency-resolution所述,更好的方法是直接在DAG之外运行 – 在每个任务完成后,检查其任何依赖任务是否现在能够运行,如果是,则运行它们.
实现这样的事情最好的方法是什么?我不清楚有一种简单的方法可以做到这一点.
据我所知,Celery的组/链/和弦原语不够灵活,不足以让我表达完整的DAG – 虽然我可能在这里错了?
我想我可以为当前任务完成后通知相关任务的任务创建一个包装器 – 我不确定处理这种通知的最佳方法是什么.访问应用程序的Django数据库并不是特别简洁,并且很难将其转换为通用库,但Celery本身并没有为此提供明显的机制.
我也遇到了这个问题,但我找不到更好的解决方案或图书馆,除了一个图书馆,对于任何有兴趣的人,你都可以结账https://github.com/selinon/selinon.虽然它仅用于python 3,但它似乎是唯一能完全符合你想要的东西.
气流是另一种选择,但气流在更静态的环境中使用,就像其他dag库一样.