平时也就使用 python 的处理数据的几个包做数据处理,很少使用 python 的一些其他的功能,之前一直想搞懂 python 自带的多线程/多进程包,反复看了很多次,也不太会用。
今天在家好好研究了一遍,不负有心人,终于可以用起来了:大概知道这个包怎么用,怎么按照我的想法用。我一直看的都是 python 的 multiprocessing 包。这次也是我的个人学习积累。喜欢记得收藏、点赞、关注。
【注】完整代码、数据、技术交流,文末获取。
什么是 multiprocessing
python 的官方文档是这么说的:multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它在 Unix 和 Windows 上均可运行。
multiprocessing 模块还引入了在 threading 模块中没有的API。一个主要的例子就是 Pool 对象,它提供了一种快捷的方法,赋予函数并行化处理一系列输入值的能力,可以将输入数据分配给不同进程处理(数据并行)。下面的例子演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用了 Pool
其实在我眼里,我不在乎multiprocessing是多线程或者多进程,只要能充分利用cpu多核性能、提高我的计算效率就行。
使用 multiprocessing 目的
我们用这个包要实现什么功能
上面就是我的目的,和我的需求;如果都达到了,那我就觉得是完成了。
创建一个场景
我们现在有一个非常耗时的函数,是计算一个值的平方:2 => 2 * 2,我们设置一个函数,每次计算都要休息1秒。那么在python中可以是这么写:
def myf(x):time.sleep(1)
return x * x
如果我们不使用pandas、numpy包。我们可能需要这样写:
# 在不使用numpy的时候,进行简单的并行计算import time
from tqdm import tqdm
def myf(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
listx = range(10)
listy = []
for i in tqdm(listx):
temp_value = myf(x=i)
listy.append(temp_value)
print('final y:')
print(listy)
一个简单的并行计算
我们使用multiprocessing的Pool。Pool对象可以自动的将数据分享给不同进程。啥都不用管,只要传递进入数据就行。在下面的代码中,我需要计算0到19的平方,我就设置了一个Pool对象,这个对象有4个进程,我把需要的计算的函数传递进去,需要被计算的可迭代的对象传递进去。然后数据就出来了。
⚠️:这里使用的是Pool的map函数
# 遇到一个大的数据,一个比较复杂的计算,使用并行,可以极大的提高计算效率from multiprocessing import Pool
import time
from tqdm import tqdm
def myf(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
print('-- start run ---')
value_x = range(20)
P = Pool(processes=4)
value_y = P.map(func=myf, iterable=value_x)
print(value_y)
实际上,计算还是很快的,但是我中间遇到一个情况,就是我不知道这个计算到什么程度了,是计算到50%了,还是计算到80%?这个不清楚。
我想要展示计算进度
为了展示多核计算的进度条,可以这么做:
⚠️:这里使用的是Pool的imap_unordered函数
# 相对于02,这个可以有个变动的进度条,进度条不断的衍生,但是缺点也很明显,缺少进度完成情况。from multiprocessing import Pool
import time
from tqdm import tqdm
def myf(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
# print('-- start run ---')
value_x = range(20)
P = Pool(processes=4)
value_y = list(tqdm(P.imap_unordered(func=myf, iterable=value_x)))
print(value_y)
这个虽然可以看到进度条不断的完成,但是没有办法看到完成率。
展示计算完成率
⚠️:这里使用的是Pool的apply_async函数
这里的apply_async需要的就不是一个迭代对象了,和上面的map、imap_unordered就有明显的差距,我们需要把一个迭代对象拿出来,放到apply_async里面,然后再将apply_async放到列表里面,然后在把这个东西从列表里面取出来。通过get调用他。
from multiprocessing import Poolfrom tqdm import tqdm
import time
def myf(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
value_x= range(200)
P = Pool(processes=4)
# 这里计算很快
res = [P.apply_async(func=myf, args=(i, )) for i in value_x]
# 主要是看这里
result = [i.get(timeout=2) for i in tqdm(res)]
print(result)
当然,我看还有人用apply_async的callback参数,和上面的对比,感觉就有点麻烦,这里展示给各位:
from multiprocessing import Poolfrom tqdm import tqdm
import time
def myf(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
value_x= range(200)
P = Pool(processes=20)
pbar = tqdm(total=len(value_x))
# 这里计算很快
res = [P.apply_async(func=myf, args=(i, ), callback= lambda _: pbar.update(1)) for i in value_x]
# 主要是看这里
result = [i.get(timeout=2) for i in res]
print(result)
处理异常
多核计算最经常遇到的就是遇到一个错误,然后就跳出来,这怎么可以忍。就拿最常见的错误来说,函数运行超时怎么解决?
⚠️:这里使用的是Pool的apply_async函数和get来解决
from multiprocessing import Pool, TimeoutErrorimport time
from tqdm import tqdm
def myf(x):
if x % 5 == 0:
time.sleep(20.2)
else:
time.sleep(0.3)
return x * x
def safely_get(value, timeout=2):
try:
data = value.get(timeout=timeout)
except TimeoutError:
data = 0
return data
if __name__ == '__main__':
P = Pool(processes=10)
value = range(100)
pbar = tqdm(total=len(value))
# way 2
res_temp = [P.apply_async(func=myf, args=(i,), callback=lambda _: pbar.update(1)) for i in value]
# result = [res.get(timeout=3) for res in res_temp]
result = [safely_get(res, timeout=1) for res in res_temp]
# way 1
# res_temp = [P.apply_async(func=myf, args=(i,)) for i in value]
# result = [safely_get(res, timeout=1) for res in tqdm(res_temp)]
time.sleep(1)
print(result)
我把这个myf函数做了简单的处理,如果是5的倍速,就需要设置休息20.2秒,但是我的容忍度是每个函数运行时间不能超过1秒,所以我写了一饿过safely_get函数,这个函数里面有try,可以破获错误,如果超时,那么整个函数也不跳出,并且把结果返回为0。
多个进程修改同一个数据
来都来了,也把这个问题说一下:
由于python的GIL,导致同一时间,不同的进程不能同时修改同一个数据。但是使用multiprocessing包的Manager的dict,list之类的就可以。
下面这两个代码就是用来将子进程的相关信息保存到一个列表里面。然后保存到pandas里面。
使用Process做的:代码如下:
# multi sub process write data to one data (python dict)from multiprocessing import Process, Manager
import os
import time
import pandas as pd
from tqdm import tqdm
def worker(id, save_data):
time.sleep(1)
save_data[id] = {
'子进程': [os.getpid()],
'父进程': [os.getppid()],
'进程id': [id]
}
if __name__ == "__main__":
finaldata = Manager().dict()
subprocess_list = []
for i in tqdm(range(200)):
p = Process(target=worker, args=(i, finaldata))
subprocess_list.append(p)
p.start()
[p.join() for p in tqdm(subprocess_list)]
finaldata = pd.concat([pd.DataFrame(value) for (key, value) in finaldata.items()])
print(finaldata)
使用Pool做的:
# multi sub process write data to one data (python dict)# use Pool
from multiprocessing import Pool, Manager
import os
import time
import pandas as pd
from tqdm import tqdm
def worker(id, save_data):
time.sleep(1)
save_data[id] = {
'子进程': [os.getpid()],
'父进程': [os.getppid()],
'进程id': [id]
}
if __name__ == "__main__":
finaldata = Manager().dict()
P = Pool(processes=20)
# reslist = []
# for i in tqdm(range(200)):
# res = P.apply_async(func=worker, args=(i, finaldata))
# reslist.append(res)
reslist = [P.apply_async(func=worker, args=(i, finaldata)) for i in range(200)]
[res.get(timeout=200) for res in tqdm(reslist)]
finaldata = pd.concat([pd.DataFrame(value) for (key, value) in finaldata.items()])
print(finaldata)
技术交流
欢迎转载、收藏、有所收获点赞支持一下!数据、代码可以找我获取
目前开通了技术交流群,群友已超过2000人,添加时最好的备注方式为:来源+兴趣方向,方便找到志同道合的朋友
- 方式、微信搜索公众号:Python学习与数据挖掘,后台回复:加群