是否有在 Python中实现的GoF Observer的示例性示例?我有一些代码,当前有一些调试代码通过密钥类(如果设置了魔法env,当前生成消息给stderr).此外,该类还有一个接口,用于递增返回结果以及
目前该类的用法类似于:
job = SSHJobMan(hostlist, cmd) job.start() while not job.done(): for each in job.poll(): incrementally_process(job.results[each]) time.sleep(0.2) # or other more useful work post_process(job.results)
一个易用的使用模型是:
job = SSHJobMan(hostlist, cmd) job.wait() # implicitly performs a start() process(job.results)
这一切都适用于当前的实用程序.但它确实缺乏灵活性.例如,我目前支持简短的输出格式或进度条作为增量结果,我也支持
post_process()函数的简短,完整和“合并消息”输出.
但是,我想支持多个结果/输出流(到终端的进度条,对日志文件的调试和警告,从成功作业到一个文件/目录的输出,错误消息以及从非成功作业到另一个作业的其他结果)等).
这听起来像是一个需要Observer的情况……我的类的实例接受来自其他对象的注册,并在发生时使用特定类型的事件回调它们.
我正在看PyPubSub,因为我在SO相关问题中看到了几个引用.我不确定我是否已经准备好将外部依赖项添加到我的实用程序中,但是我可以看到使用它们的接口作为我的模型的价值,如果这将使其他人更容易使用它. (该项目既可以作为独立的命令行实用程序,也可以作为编写其他脚本/实用程序的类).
总之,我知道如何做我想要的……但是有很多方法可以实现它.从长远来看,我想知道最有可能为代码的其他用户工作的建议.
代码本身位于:classh.
However it does lack flexibility.
嗯……实际上,如果您想要的是异步API,这对我来说就像是一个很好的设计.通常是.也许您只需要从stderr切换到Python的logging
模块,它有一种自己的发布/订阅模型,Logger.addHandler()等等.
如果你确实想支持观察者,我的建议是保持简单.你真的只需要几行代码.
class Event(object): pass class Observable(object): def __init__(self): self.callbacks = [] def subscribe(self, callback): self.callbacks.append(callback) def fire(self, **attrs): e = Event() e.source = self for k, v in attrs.iteritems(): setattr(e, k, v) for fn in self.callbacks: fn(e)
您的Job类可以是Observable的子类.当感兴趣的事情发生时,调用self.fire(type =“progress”,percent = 50)等.