目录 进程互斥锁 基本概念 互斥锁的使用 IPC 基本概念 队列 生产者消费者模型 基本概念 代码实现 线程 基本概念 创建线程 线程互斥锁 进程互斥锁 基本概念 临界资源: 一次仅允许一个
目录
- 进程互斥锁
- 基本概念
- 互斥锁的使用
- IPC
- 基本概念
- 队列
- 生产者消费者模型
- 基本概念
- 代码实现
- 线程
- 基本概念
- 创建线程
- 线程互斥锁
进程互斥锁
基本概念
- 临界资源: 一次仅允许一个进程使用的资源称为临界资源, 进程间采取互斥的方式, 共享临界资源
- 进程互斥: 一个进程正在访问临界资源, 另一个要访问该资源的进程必须等待
- 让并发变成串形, 牺牲了执行效率, 保证了数据的安全
- 在程序并发执行时, 需要修改时使用
互斥锁的使用
# base_data--->{"ticket_num": 1} # 模拟抢票软件 import json import time from multiprocessing import Process # 查看余票 def search(user): with open('base_data', 'r', encoding='utf-8') as f: dic = json.load(f) ticket_num = dic.get('ticket_num') print(f'用户{user}正在查看余票, 当前余票{ticket_num}张...') # 购买车票 def buy(user): with open('base_data', 'r', encoding='utf-8') as f: dic = json.load(f) # 阻塞 time.sleep(1) if dic.get('ticket_num') > 0: dic['ticket_num'] -= 1 with open('base_data', 'w', encoding='utf-8') as f1: json.dump(dic, f1) print(f'用户[{user}]抢票成功!') else: print(f'用户[{user}]抢票失败!') # 开始抢票 def run(user): search(user) buy(user) if __name__ == '__main__': for i in range(10): # 并发开启10个子进程 p = Process(target=run, args=(f'{i}',)) p.start() ''' 用户8正在查看余票, 当前余票1张... 用户6正在查看余票, 当前余票1张... 用户3正在查看余票, 当前余票1张... 用户0正在查看余票, 当前余票1张... 用户4正在查看余票, 当前余票1张... 用户2正在查看余票, 当前余票1张... 用户1正在查看余票, 当前余票1张... 用户7正在查看余票, 当前余票1张... 用户5正在查看余票, 当前余票1张... 用户9正在查看余票, 当前余票1张... 用户[8]抢票成功! 用户[6]抢票成功! 用户[3]抢票成功! 用户[4]抢票成功! 用户[1]抢票成功! 用户[5]抢票成功! 用户[2]抢票成功! 用户[0]抢票成功! 用户[7]抢票成功! 用户[9]抢票成功! '''
使用进程锁将并发变成串行
# 模拟抢票软件 import json import time from multiprocessing import Process from multiprocessing import Lock # 查看余票 def search(user): with open('base_data', 'r', encoding='utf-8') as f: dic = json.load(f) ticket_num = dic.get('ticket_num') print(f'用户[{user}]正在查看余票, 当前余票{ticket_num}张...') # 购买车票 def buy(user): with open('base_data', 'r', encoding='utf-8') as f: dic = json.load(f) time.sleep(1) if dic.get('ticket_num') > 0: dic['ticket_num'] -= 1 with open('base_data', 'w', encoding='utf-8') as f1: json.dump(dic, f1) print(f'用户[{user}]抢票成功!') else: print(f'用户[{user}]抢票失败!') # 开始抢票 def run(user, mutex): # 上锁 mutex.acquire() search(user) buy(user) # 解锁 mutex.release() if __name__ == '__main__': # 调用Lock()类得到一个锁对象 mutex = Lock() for i in range(10): # 并发开启10个子进程 p = Process(target=run, args=(f'{i}', mutex)) p.start() ''' 用户[0]正在查看余票, 当前余票1张... 用户[0]抢票成功! 用户[3]正在查看余票, 当前余票0张... 用户[3]抢票失败! 用户[2]正在查看余票, 当前余票0张... 用户[2]抢票失败! 用户[4]正在查看余票, 当前余票0张... 用户[4]抢票失败! 用户[8]正在查看余票, 当前余票0张... 用户[8]抢票失败! 用户[6]正在查看余票, 当前余票0张... 用户[6]抢票失败! 用户[7]正在查看余票, 当前余票0张... 用户[7]抢票失败! 用户[5]正在查看余票, 当前余票0张... 用户[5]抢票失败! 用户[1]正在查看余票, 当前余票0张... 用户[1]抢票失败! 用户[9]正在查看余票, 当前余票0张... 用户[9]抢票失败! '''
IPC
基本概念
- inter-process communication 进程间通信
- 进程间的数据是相互隔离的, 要想进行进程间的通信可以使用队列
队列
- 进程间通信的一种方式, 支持多进程传入和取出数据
- 遵循先进先出的原则
from multiprocessing import Queue q = Queue(5) # 队列中最多存放5个数据 # 填入数据 q.put('数据1') q.put('数据2') q.put('数据3') q.put('数据4') q.put('数据5') # q.put('数据6') # 数据填满了继续存放, 程序会被卡住 # 查看队列是否填满 print(q.full()) # 队列满了, 则会会报错 # q.put_nowait('数据6') # 获取数据, 若队列中无数据可获取, 程序会卡住 print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # 队列中没有, 则会报错 # print(q.get_nowait()) # 判断队列是否为空 print(q.empty()) ''' True 数据1 数据2 数据3 数据4 数据5 True '''
生产者消费者模型
基本概念
- 生产者: 生产数据的
- 消费者: 使用数据
- 生产者消费者模型: 通过容器来解决生产者和消费者的之间的强耦合问题
代码实现
from multiprocessing import Process, Queue import time # 定义生产者 def producer(q): for i in range(5): data = f'包子{i}' q.put(data) print(f'生产了{data}') time.sleep(0.1) # 定义生产者 def consumer(q): while True: data = q.get() print(f'吃了{data}') if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=(q,)) p2 = Process(target=consumer, args=(q,)) p1.start() p2.start() print('主') ''' 主 生产了包子0 吃了包子0 生产了包子1 吃了包子1 生产了包子2 吃了包子2 生产了包子3 吃了包子3 生产了包子4 吃了包子4 '''
线程
基本概念
- 进程是资源单位, 线程才是CPU的执行单位, 进行运算调度的最小单位
- 线程包含在进程之中, 是进程中的实际运作单位
- 线程开销要远小于进程, 可以节省内存资源
- 线程之间共享进程中的数据
- 线程pid为主进程pid
创建线程
from threading import Thread import time # 方式一 def task(): print('线程开启') time.sleep(1) print('线程结束') if __name__ == '__main__': t = Thread(target=task) t.start() # 方式二 class MyThread(Thread): def run(self): print('线程开启') time.sleep(1) print('线程结束') if __name__ == '__main__': t = MyThread() t.start()
线程互斥锁
from threading import Thread, Lock import time n = 100 def task(i): print(f'线程{i}启动...') global n temp = n time.sleep(0.1) n = temp - 1 print(n) if __name__ == '__main__': for i in range(10): t = Thread(target=task, args=(i + 1,)) t.start() ''' 线程1启动... 线程2启动... 线程3启动... 线程4启动... 线程5启动... 线程6启动... 线程7启动... 线程8启动... 线程9启动... 线程10启动... 99 99 99 99 99 99 99 99 99 99 '''
from threading import Thread, Lock import time mutex = Lock() n = 100 def task(i): mutex.acquire() print(f'线程{i}启动...') global n temp = n time.sleep(0.1) n = temp - 1 print(n) mutex.release() if __name__ == '__main__': for i in range(10): t = Thread(target=task, args=(i + 1,)) t.start() ''' 线程1启动... 99 线程2启动... 98 线程3启动... 97 线程4启动... 96 线程5启动... 95 线程6启动... 94 线程7启动... 93 线程8启动... 92 线程9启动... 91 线程10启动... 90 '''
?