当前位置 : 主页 > 编程语言 > 其它开发 >

设计模式-生产者消费者模式

来源:互联网 收集:自由互联 发布时间:2022-06-16
生成者消费者模式 解释 什么是生产者和消费者? 两个模块(可能是函数、线程或进程),一个生产数据,一个消费数据,生产数据的模块叫生产者,消费数据的模块叫消费者。 为什么
生成者消费者模式
  1. 解释

    • 什么是生产者和消费者?

      两个模块(可能是函数、线程或进程),一个生产数据,一个消费数据,生产数据的模块叫生产者,消费数据的模块叫消费者。

    • 为什么要使用生产者-消费者模式?

      在多线程开发时,生产者可能产生数据的速度比较快,这时候生产者需要等消费者消费完了才能继续生产数据,因为生产的数据没地方放,如果消费者消费数据的速度比较快,消费者就要等待生产者生产数据,为了解决这种供需不平衡的问题,以达到生产者和消费者之间的一种“平衡”状态,引入“生产者-消费者“模式

    • 什么是生产者消费者模式?

      一个生产者(线程或进程)生产数据,一个消费者(线程或进程)消费数据,它们之间通过一个缓冲区来进行通讯,生产者生产的数据放入缓冲区,消费者从缓冲区取来数据进行消费。

    • 生产者消费者模式有什么优点?

      • 解耦:将生产者类和消费者类进行解耦,消除代码之间的依赖性,简化工作负载的管理
      • 复用:通过将生产者类和消费者类独立开来,那么可以对生产者类和消费者类进行独立的复用与扩展
      • 调整并发数:由于生产者和消费者的处理速度是不一样的,可以调整并发数,给予慢的一方多的并发数,来提高任务的处理速度
      • 异步:对于生产者和消费者来说能够各司其职,生产者只需要关心缓冲区是否还有数据,不需要等待消费者处理完;同样的对于消费者来说,也只需要关注缓冲区的内容,不需要关注生产者,通过异步的方式支持高并发,将一个耗时的流程拆成生产和消费两个阶段,这样生产者因为执行put()的时间比较短,而支持高并发
      • 支持分布式:生产者和消费者通过队列进行通讯,所以不需要运行在同一台机器上,在分布式环境中可以通过redis的list作为队列,而消费者只需要轮询队列中是否有数据。同时还能支持集群的伸缩性,当某台机器宕掉的时候,不会导致整个集群宕掉
  2. 生产者消费者例子

    1. e.g.

      5个厨师做包子,三秒一个,10个食客吃包子,1秒一个,食客吃的又快人又多

      import queue
      import threading
      import time
      
      q = queue.Queue()
      
      
      def cook(i):
          while True:
              q.put(f'厨师{i}做de包子')
              time.sleep(2)
      
      def hangers_on(j):
          while True:
              print(f'食客{j}吃了{q.get()}')
              time.sleep(1)
      
      for i in range(5):
          threading.Thread(target=cook,args=(i,)).start()
      
      for j in range(10):
          threading.Thread(target=hangers_on,args=(j,)).start()
      
      
    2. 用协程实现同样的例子

      q = asyncio.Queue()
      
      
      async def hangers_on(j):
          while True:
              print(f'食客{j}吃了{await q.get()}')
              await asyncio.sleep(3)
      
      async def cook(i):
          while True:
              print(f'厨师{i}做的包子')
              await q.put(f'厨师{i}做的包子')
              await asyncio.sleep(1)
      
      
      async def main():
      
      
          cook_tasks = [asyncio.create_task(cook(j)) for j in range(10)]
          hanges_tasks = [asyncio.create_task(hangers_on(i)) for i in range(10)]
          await asyncio.gather(*cook_tasks,*hanges_tasks,return_exceptions = True)
      
      
      asyncio.run(main())
      
      
上一篇:基于MybatisPlus代码生成器(2.0新版本)
下一篇:没有了
网友评论