线程
一、前言
1、概述
- 线程
在一个进程的内部 要同时干很多件事 就需要同时执行多个子任务 我们把进程内的这些子任务称为 线程
线程通常叫轻型的进程 线程是共享内存空间的并发执行的多任务 每一个线程都共享一个进程的资源
线程是最小的执行单元 而进程最少由一个线程组成的
线程的执行时间 由进程决定的 - 主线程
任何进程都会有一个默认的主线程 如果主线程死掉 子线程也死掉 所以子线程依赖于主线程 - 模块
- _thread 低级模块
- threading 高级模块 对_thread进行了封装(建议)
二、线程使用
1、_thread模块
-
导入
import _thread -
开启线程
_thread.start_new_thread(函数,参数) -
注意
- 参数为元组类型
- 如果主线程死掉了 则子线程死掉
- 如果不传参 可以传递一个空元祖占位
-
实例
import _thread
import timedef fun(): for i in range(10): print(i,'---------------') time.sleep(1) for i in range(5): _thread.start_new_thread(fun, ()) for x in range(10): time.sleep(1) print('over')
线程_thread下方需要有阻塞代码 否则进程执行完毕 线程就死掉了
2、threading(建议)
-
导入
import threading -
创建线程
thr = threading.Thread(target=函数名, args=(参数,) ,name='名字')- target 函数名 要执行的任务
- args 参数 传递给任务的参数
- name 给线程起名称
-
设置线程名称
- 实例化的时候传递name参数
- thr.setName(名称)
-
开启线程
thr.start() -
线程等待
thr.join() -
返回当前线程对象
- threading.current_thread()
- threading.currentThread()
-
获取线程名称
- threading.current_thread().name
- threading.current_thread().getName()
-
获取所有线程的名称
threading.enumerate() [ɪˈnjuːməreɪt] -
设置线程守护
t.setDaemon(True) # 线程守护 是否随着主线程的结束而结束 -
实例
import threading
import timedef fun(i): print('线程开始', threading.current_thread().name) time.sleep(1) if __name__ == '__main__': print(threading.current_thread().name) t = threading.Thread(target=fun, args=(1,)) t.setName('name'+str(i)) # 通过setname方法进行设置线程名称 t.start() # 开启线程 t_list.append(t) # 循环等待 i.join() print('over')
3、开启多线程
import threading
import time
def fun(i):
print('线程开始', threading.current_thread().name)
time.sleep(1)
if __name__ == '__main__':
print(threading.current_thread().name)
t_list = []
for i in range(5):
# 实例化并设定线程名称
# t = threading.Thread(target=fun, args=(1,), name='name-'+str(i))
t = threading.Thread(target=fun, args=(1,))
t.setName('name'+str(i)) # 通过setname方法进行设置线程名称
t.start() # 开启线程
t_list.append(t)
# 循环等待
for i in t_list:
i.join()
print('over')
4、类的继承 创建线程
import threading
class MyThread(threading.Thread):
def __init__(self, arg, name):
threading.Thread.__init__(self)
self.i = arg
self.name = name
def run(self):
print('我是类的继承实现的线程', self.i, self.name)
t = MyThread(1, 2)
t.start()
t.join()
5、线程共享数据
-
概述
多线程和多进程最主要的区别在于 多进程 中 同一个变量 会被拷贝到每一个进程中 互不影响
而多线程中 所有的变量都是线程间共享的 所以 任意一个变量都可以被线程进行修改 原因就是在于线程中变量共享的 -
数据共享
import threading
import time
name = 'lucky'def fun1(): global name name = '张三' print(threading.current_thread().name, name) time.sleep(2) def fun2(): print(threading.current_thread().name, name) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(name) print('over')
-
线程冲突
import threadingi = 1 def change_num(num): global i i += num i -= num def fun1(): for i in range(10000000): change_num(6) def fun2(): for i in range(10000000): change_num(8) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i) print('over')
两个线程同时对同一数据进行读写就会产生数据的错乱 解决办法 在进行写的时候保证只有一个线程
6、Lock线程锁(数据错乱的问题)
-
概述
lock锁是线程模块中的一个类 有俩个主要的方法 acquire() 和release() 当调用acquire()时 它进行锁定 并阻塞执行 直到调用release() 以防止数据损坏 因为一次只有一个线程进行访问资源 -
作用
避免线程冲突 -
注意
- 当前的线程锁定以后 后面的线程会等待 (线程等待/线程阻塞)
- 需要release以后才能解锁恢复正常
- 不能重复锁定
-
内存错乱
import threading, timei = 1 def fun1(): global i for x in range(1000000): i += x i -= x print('fun1----------', i) def fun2(): global i for x in range(10000000): i += x i -= x print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join()
俩个线程对统一资源进行读写 可能会造成数据值的错乱我么必须保证一个线程在修改的同时其他线程不能进行操作
-
线程锁的简单使用
import threading
# 创建线程锁
lock = threading.Lock()
# 进行锁定
lock.acquire()
# 进行解锁
lock.release() -
死锁
import threadinglock = threading.Lock() a = 0 def sumOne(): global a lock.acquire() a += 1 # lock.release() print(a) def sumTwo(): global a print('two', a) lock.acquire() a += 1 lock.release() sumOne() sumTwo() print(a)
-
解决上面冲突问题
import threading, time
lock = threading.Lock()
i = 1
def fun1():
global i
if lock.acquire():
for x in range(1000000):
i += x
i -= x
lock.release()
print('fun1----------', i)def fun2(): global i if lock.acquire(): for x in range(10000000): i += x i -= x lock.release() print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i)
-
Lock的简写
with lock:
pass -
上面进行更改
import threading, time
lock = threading.Lock()
i = 1
def fun1():
global i
with lock:
for x in range(1000000):
i += x
i -= x
print('fun1----------', i)def fun2(): global i with lock: for x in range(10000000): i += x i -= x print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i)
7、RLock 避免单线程死锁
-
说明
上面的Lock不能识别Lock是被哪个线程所持有的 如果任何线程正在访问共享资源 那么试图访问共享资源的其他线程将被阻塞 即使锁定共享资源的线程也是如此 如果使用RLock 就不会出现这种问题 RLock可以识别当前线程 如果是当前线程锁定 则不会出现死锁也就是阻塞的问题 -
作用
在同一线程内 进行多次acquire 不会被阻塞 -
线程锁的简单使用
import threading
# 创建线程锁
rlock = threading.RLock()
# 进行锁定
rlock.acquire()
# 进行解锁
rlock.release() -
同一线程多次锁定
import threading
rlock = threading.RLock()
a = 0def sumOne(): global a rlock.acquire() a += 1 rlock.acquire() rlock.release() print(a) sumOne() print(a)
-
解决冲突问题
import threading, time
rlock = threading.RLock()
i = 1
def fun1():
global i
if rlock.acquire():
for x in range(1000000):
i += x
i -= x
rlock.release()
print('fun1----------', i)def fun2(): global i if rlock.acquire(): for x in range(10000000): i += x i -= x rlock.release() print('fun2-------', i) if __name__ == '__main__': t1 = threading.Thread(target=fun1) t2 = threading.Thread(target=fun2) t1.start() t2.start() t1.join() t2.join() print(i)
-
rlock的简写
rlock = threading.RLock()
with rlock:
...
8、Local
-
概述
ThreadLocal 有的称呼为线程本地变量 也有称呼为线程本地存储
ThreadLocal 在每一个变量中都会创建副本 每个线程都可以访问自己内部副本的变量 -
作用
它本身是一个全局变量 但是每个线程可以利用它来保护自己的私有数据 这些私有数据对其它的线程是不可见的 -
导入
import threading
local = threading.local() -
使用
import threadinglocal = threading.local() def run(n): local.x += n local.x -= n def fun(): # 设定local.x的初始值 local.x = 0 for i in range(10000000): run(i) print(threading.current_thread().name, local.x) if __name__ == '__main__': t1 = threading.Thread(target=fun) t2 = threading.Thread(target=fun) t1.start() t2.start() t1.join() t2.join()
9、Timer定时执行
-
概述
Timer是Thread的子类 可以指定某段时间间隔后在执行某个操作 -
使用
import threadingdef go(): print('小老弟 你啥时候能看见我?') # 开启定时任务 t = threading.Timer(5, go) t.start() print('over')
10、线程通信(触发动作)
-
作用
线程间传递信号 -
使用
import threading
e = threading.Event()
e.wait() # 线程等待 等待一次
e.clear() # 将wait重置 再次等待
e.set() # 触发wait -
实例
import threading
import timedef run(): # 创建了一个event对象 e = threading.Event() def go(): e.wait() # 阻塞等待 print('我的心在等待 永远在等待') threading.Thread(target=go).start() return e e = run() time.sleep(3) e.set() # 触发
-
触发一次执行一次
import threadingdef run(e): for i in range(5): e.wait() print('---------------', i) e.clear() if __name__ == '__main__': e = threading.Event() t = threading.Thread(target=run, args=(e, )) t.start() for i in range(5): v = input() if v == 'y': e.set()
11、信号量
-
作用
用于控制线程的数量 -
概述
信号量semaphore [ˈseməfɔːr]
管理一个内置的计数器 每次调用acquire的时候计数器减一
当调用release的时候 内置计数器加1
计数器不能小于0 当计数器为0时 acquire将进行阻塞 直到调用release -
使用
import threading
import timedef go(): if s.acquire(): print('go go go 哦来哦哦来哦来') time.sleep(1) s.release() if __name__ == '__main__': s = threading.Semaphore(2) # 限制并发数为2 for i in range(5): threading.Thread(target=go).start()
-
简写
s = threading.Semaphore(2) # 限制并发数为2
with s:
... -
import threading import time def go(): # if s.acquire(): with s: print('go go go 哦来哦哦来哦来') time.sleep(1) # s.release() if __name__ == '__main__': s = threading.Semaphore(2) # 限制并发数为2 for i in range(5): threading.Thread(target=go).start()
12、ThreadPool 线程池(老版本)
-
安装
pip install threadpool -
概述
启动大量线程 -
导入
import threadpool -
使用
- 创建线程池
threadpool.ThreadPool() - 创建线程池任务
threadpool.makeRequests(go, name_list) - 将线程池任务放到线程池中
for i in req:
pool.putRequest(i)
[pool.putRequest(i) for i in req] - 阻塞等待
pool.wait()
- 创建线程池
-
实例
import threadpool
import timedef go(name): print('人生苦短 {}:快用python 一行代码就见效'.format(name)) time.sleep(2) name_list = ['大伟', '老曹', '什么雨', '却什么黑'] # 创建线程池 并发3 pool = threadpool.ThreadPool(3) # 创建线程池任务 req = threadpool.makeRequests(go, name_list) # for i in req: # pool.putRequest(i) [pool.putRequest(i) for i in req] # 循环放到线程池中 pool.wait() # 阻塞等待
13、线程池新模块
-
导入
from concurrent.futures import ThreadPoolExecutor[ɪɡˈzekjətər] -
方法
- submit() 传入函数及参数
- map() 统一传参管理
-
使用
from concurrent.futures import ThreadPoolExecutor
import time
8def go(name): print('人生苦短 {}:快用python 一行代码就见效'.format(name)) time.sleep(2) if __name__ == '__main__': name_list = ['大伟', '老曹', '什么雨', '却什么黑'] pool = ThreadPoolExecutor(3) # for n in name_list: # pool.submit(go, n) pool.map(go, name_list) print('over')
14、队列模块
- 导入
import queue - 概述
队列模块是python标准库中线程安全队列 (FIFO)first in first out 的缩写 先进先出 用来存储生产者和消费者之间的信息传递 - 格式
queue.Queue(maxsize=0)
当参数为0 或者小于0 队列大小没有限制
如果为其它整数 则代表当前队列的限制数 如果超出限定的数 则阻塞 - 简单使用
import queue
# 实例化队列
q = queue.Queue()
# q = queue.Queue(3)
# 循环将数据放入队列
for i in range(5):
q.put(i)
# 循环取出队列数据
while not q.empty():
print(q.get())
15、生产者消费者
import threading
import queue
import time, random
class createThread(threading.Thread):
def __init__(self, que, name):
threading.Thread.__init__(self)
self.que = que
self.name = name
def run(self):
while True:
time.sleep(3) # 3秒钟生产一个
num = random.randint(1, 1000000)
self.que.put('生产者:'+str(self.name)+'商品:'+str(num))
print('生产者:'+str(self.name)+'商品:'+str(num))
class consumerThread(threading.Thread):
def __init__(self, que, name):
threading.Thread.__init__(self)
self.que = que
self.name = name
def run(self):
while True:
time.sleep(1) # 1秒钟消费一个
data = self.que.get()
print('客户:'+str(self.name)+'买到了:'+data)
if __name__ == '__main__':
que = queue.Queue(10)
for i in range(3):
createThread(que, i).start()
for i in range(7):
consumerThread(que, i).start()
三、线程VS进程
1、多任务原理
首先 要实现多任务 通常我们会设计 Master - Worker模式 Master负责分配任务 worker负责执行任务 因此 多任务环境下 通常是一个master 多个 worker
多进程实现Master worker 主进程则为Master 其它进程则为worker
多线程实现Master worker 主线程为Master 其它线程为worker
2、多进程
- 优点
稳定性高 :多进程的最大优点为 稳定性高 如果多个进程中有一个子进程崩溃了 则不会影响到其它的进程
Apache 就是使用的多进程 - 缺点
创建进程的代价比较大 针对于Windows Unix/Linux下影响不大的 所以说在Windows下开启多进程开销比较大
操作系统运行的进程数是有限的
3、多线程
- 优点
多线程的执行效率高于多进程 - 缺点
任何一个线程崩溃(挂掉) 会影响整个进程的崩溃 所有线程共享存储空间的
四、协程
1、概念
-
协程
又称微线程 是一种轻量级的线程 -
子程序
在所有语言中 都是层级调用的 比如 在A中调用B B在执行过程中调用了C C执行完进行返回 B执行完返回 最后是A执行完毕 一个函数就是执行的子程序 子程序的调用 总会有一个入口 一次返回 调用的顺序是明确的 -
理解协程
之前的线程 是由操作系统进行调度的 协程按照程序员按照需求自己调度的 -
实现
python对协程的实现是通过生成器 generator来实现的
在生成器中 不但可以作用于for循环, 还可以作用于next方法 -
优点
协程的效率极高 因为子程序 不需要线程或者进程的切换 也没有额外的开销
不需要多线程的锁机制 -
缺点
无法利用多核cpu 协程的本质是单线程 -
send方法
将值发送给生成器中的yield 进行接收 并且代码向下执行(send实现了next的方法) -
注意
协程的启动 需要send(None) -
数据的传递
# send()def run(): data = '123' r = yield data print('代码1', r) r = yield data print('代码2', r) r = yield data r = run() # print(next(r)) # print(next(r)) # print(next(r)) r.send(None) # 协程启动 None print(r.send('a')) print(r.send('b'))
-
生产者和消费者
传统的生产者和消费者 是一个线程写消息 另一个读取消息
如果改用协程的话 就由生成器来进行实现
def product(c):
c.send(None)
for i in range(10):
c.send(i)
print('生产了数据:', i)
# res = c.send(i)
# print('生产了数据:', i, res)def con(): data = '' while True: r = yield data print('消费了:{}数据'.format(r)) if __name__ == '__main__': c = con() product(c) # print(c)