一、信号量 信号量:一套资源 同一时间 只能被n个人访问 某一段代码 同一时间 只能被n个进程执行,控制访问人数 Semaphore用上锁的原理实现的,内置了一个计数器,在同一时间 只能有
一、信号量
信号量:一套资源 同一时间 只能被n个人访问
某一段代码 同一时间 只能被n个进程执行,控制访问人数
Semaphore用上锁的原理实现的,内置了一个计数器,在同一时间 只能有指定数量的进程执行被控制住代码
import random from multiprocessing import Process import time from multiprocessing import Semaphore def ktv(i, sem): sem.acquire() print("%s走进ktv" % i) time.sleep(random.randint(2, 3)) print("%s走出ktv" % i) sem.release() if __name__ == ‘__main__‘: sem = Semaphore(4) for i in range(20): p = Process(target=ktv, args=(i, sem)) p.start()
二、事件
# 通过一个信号 来控制 多个进程 同时执行或阻塞
# 事件 wait() 是依据事件的状态来决定自己是否在wait处阻塞
#False 阻塞 true不阻塞
# 一个信号可以使所有的进程都进入阻塞状态 # 也可以控制所有的进程解除阻塞 # 一个事件被创建之后,默认是阻塞状态 e = Event() # 创建了一个事件 print(e.is_set()) # False 查看一个事件的状态 # e.set() # 将这个事件的状态改为True # print(e.is_set()) # True # e.wait() # 是依据e.is_set()的值来决定是否阻塞 # print(12345) # e.clear() # 将这个事件的状态改为False # print(e.is_set()) # False # e.wait() # print(78910)
三、红绿灯事件
需求:模拟红绿灯
from multiprocessing import Process, Event import time, random def car(e, i): if not e.is_set(): print("\033[33m%s车辆在等待" % i) e.wait() # 阻塞 直到得到一个 事件True的信号 print("car%s车辆通过" % i) def light(e): while True: if e.is_set(): e.clear() # false print("\033[31m红灯亮了\033[0m") else: # e.set() # True print("\033[32m绿灯亮了\033[0m") e.set() time.sleep(2) if __name__ == ‘__main__‘: e = Event() traffic = Process(target=light, args=(e,)) traffic.start() for i in range(20): cars = Process(target=car, args=(e, i)) cars.start() # time.sleep(random.random()) time.sleep(1) # 步骤:1.先执行红绿灯 2.红绿灯在等待2秒过程中,拿到事件
***注意:要对应下面关系,看红灯还是绿灯
四、队列
IPC进程间通信
from multiprocessing import Queue import time q = Queue(5) q.put(1) q.put(2) q.put(3) q.put(4) q.put(5) # q.put(5) # 第六个就开始阻塞 print(q.full()) # True表示队列是否满了 满True /False ,不完全准确,多进程容易被同时访问 print(q.get()) # 1 print(q.get()) # 2 print(q.get()) # 3 print(q.get()) # 4 print(q.get()) # 5 print(q.empty()) # 判断是否已经取空了 空True/False # print(q.get()) # 阻塞了 while True: try: q.get_nowait() except: print("队列空啦") # 用于判断是否空啦,不会报错 time.sleep(0.5) # for i in range(6): # q.put(i)""" import time from multiprocessing import Queue, Process def produce(q): q.put("hello") def consume(q): print(q.get()) if __name__ == ‘__main__‘: q = Queue() p = Process(target=produce, args=(q,)) p.start() # print(q.get()) c = Process(target=consume, args=(q,)) c.start() # 两个子进程之间或者主程序通信,可以通过队列进行通讯队列:先进先出
五、消费生产者模型
# 队列 # 生产者消费者模型 # 生产者 进程 # 消费者 进程 # 买包子案例 from multiprocessing import Process, JoinableQueue import time, random # 生产者 def producer(name, food, q): for i in range(10): time.sleep(random.randint(1, 2)) f = "\033[31m%s生产了%s%s个\033[0m" % (name, food, i) print(f) q.put(f) # q.join() # 阻塞 直到一个队列中的数据 全部被执行完毕 然后就结束进程 # 消费者 def consumer(q, name): while True: food = q.get() # if food is None: # print("%s拿到了一个空" % name) # break print("\033[32m%s吃了了%s\033[0m" % (name, food)) time.sleep(random.randint(1, 2)) q.task_done() # count-1 发送数据处理成功(取值成功) if __name__ == ‘__main__‘: q = JoinableQueue(20) p1 = Process(target=producer, args=("吴凯", "包子", q)) p1.start() p2 = Process(target=producer, args=("坦克", "泔水", q)) p2.start() c1 = Process(target=consumer, args=(q, "猪狗")) c2 = Process(target=consumer, args=(q, "逗比")) c1.daemon = True c2.daemon = True # 把消费者编程守护进程 子进程随着主进程的结束而结束 c1.start() c2.start() p1.join() p2.join() # 感知子进程序结束 q.join() # 直到“队列”中的数据 全部被执行完毕 然后就结束进程 # q.put(None) # q.put(None) """ 在消费者这一端: 每次获取一个数据 处理一个数据 发送一个记号:标志一个数据被处理成功 在生产者这一端: 每一次生产一个数据 且每一次生产的数据都被放在队列中 在队列中刻上一个记号 当生产者全部生产完毕之后 join信号:已经停止生产数据(阻塞),且要等待之前刻上的记号都被消费者消费完 当数据都被处理完时,join结束 """ # 生产者一直在等待消费者把所有数据处理完毕然后结束 (消费者每次处理数据都会发送一个处理成功的记号)