当前位置 : 主页 > 网络编程 > PHP >

Python中的多进程与进程池

来源:互联网 收集:自由互联 发布时间:2023-09-07
Python中的多进程与进程池 文章目录 ​​Python中的多进程与进程池​​ ​​1.multiprocessing类​​ ​​2.multiprocessing.Pool进程池​​ ​​构造方法​​ ​​常用方法​​ ​​多进程,多线


Python中的多进程与进程池

文章目录

  • ​​Python中的多进程与进程池​​
  • ​​1.multiprocessing类​​
  • ​​2.multiprocessing.Pool进程池​​
  • ​​构造方法​​
  • ​​常用方法​​
  • ​​多进程,多线程的选择​​
  • ​​应用​​
  • ​​Linux的特殊进程​​
  • ​​僵尸进程​​
  • ​​孤儿进程​​
  • ​​守护进程​​

1.multiprocessing类

方法

含义

current_process()

获取当前进程对象

类似于threading.current_thread()

active_children()

获取当前进程所有活动的子进程列表。注意:不包含自己,即当前进程。有点类似于threading.enumerate()

cpu_count()

返回当前系统CPU的数量

import multiprocessing

if __name__ == "__main__":
p = multiprocessing.Pool(4)
print(multiprocessing.current_process())
print(multiprocessing.active_children())
print(multiprocessing.cpu_count())

Python中的多进程与进程池_Python中的多进程与进程池

2.multiprocessing.Pool进程池

构造方法

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) #构造方法

  • processes #进程池最大使用的工作进程数量,默认值为None,会使用os.cpu_count()来获取cpu的数量作为最大进程数量。
  • initializer #进程初始化函数,进程初始化时会调用该函数。
  • initargs #参数,为initializer函数调用时传递的参数。
  • maxtasksperchild #工作进程退出前可以完成的任务数,完成后用一个新的工作进程来替代原进程,让闲置的资源释放,默认是None,此意味只要Pool存在工作进程就一直存活(如果改为maxtasksperchild=1每次线程执行worker时都会重新创建一个新的进程并执行show)
  • context #用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。(context中有一个进程或者工作任务的队列,会产生阻塞,保证进程会永久存在。)
  • 注意Pool创建后,会立即实例化指定个数的进程。以备使用
import multiprocessing
import logging
import sys
import time

logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)

def show(i):
log.info("我启动了{}".format(i))

def worker(k):
time.sleep(2)
log.info("正在打印-{}".format(k))

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2,initializer=show,initargs=(8,),maxtasksperchild=None)
# # 如果改为maxtasksperchild=1每次线程执行worker时都会重新创建一个新的进程并执行show
#pool = multiprocessing.Pool(processes=2, initializer=show, initargs=(8,), maxtasksperchild=1)
time.sleep(3)
for i in range(4):
pool.apply(worker,args=(i+5,))

常用方法

名称

说明

apply(self,func,args=(),kwds={}])

柱塞执行,导致主进程执行其他进程就像一个个执行

func #要执行的函数

args #函数的位置参数

kwds #函数的关键字参数

apply_async(self,func,args=(),kwds={},callback=None,error_callback=None)

与apply方法用法一致,非阻塞异步执行,得到结果会执行回调函数callback,如果出现异常会执行异常回调函数error_callback。会返回一个将来的返回对象。

close(self)

关闭池,池不能再接受新的任务,所有任务完成后退出进程

terminate(self)

立即结束工作进程,不再处理未处理的任务

join(self)

主进程阻塞等待子进程的退出,join方法要在close或terminate后使用。

否则由于进程池的进程默认不会消亡,会永久阻塞

  • apply(self,func,args=(),kwds={}])->return #同步调用,会阻塞,一个个执行。相当于串行
  1. func #要执行的函数
  2. args #函数的位置参数
  3. kwds #函数的关键字参数
  4. return #返回函数执行的放回结果
import multiprocessing
import logging
import sys
import time

logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)

def worker(k):
time.sleep(2)
log.info("正在打印-{}".format(k))
return k+5

if __name__ == "__main__":
p = multiprocessing.Pool(4)
for i in range(5):
req = p.apply(func=worker,args=(i,))
log.info("{}~~~~~~~~".format(req))
print("-----------")
  • apply_async(self,func,args=(),kwds={},callback=None,error_callback=None)->multiprocessing.pool.ApplyResult #同步执行,返回结果是一个未来的结果对象
  1. func #要执行的函数
  2. args #函数的位置参数
  3. kwds #函数的关键字参数
  4. callback #回调函数,当函数执行完成后,回调执行的函数,会在主进程中创建一个线程来执行。
  5. error_callback #异常回调函数,如果发生异常后,会在主进程中创建一个线程执行。(一参的回调函数)
  6. multiprocessing.pool.ApplyResult 函数执行返回的结果对象。在未来的函数执行完成后,会对这个结果对象赋值。(一参的回调函数)
  • class multiprocessing.pool.AsyncResult类
  1. 由Pool.apply_async()和Pool.map_async()返回的结果的类。

方法

函数

get(self,itemout=None)

获取异步请求函数执行的返回结果。如果未执行完成,会产生一个阻塞。直到返回结果为止。

timeout #设置超时时长,如果指定时间内没有返回会抛出一个multiprocessing.context.TimeoutError异常

wait(self,timeout=None)

等待结果,会阻塞,直到结果返回。无论是否等到,都返回None。

timeout超时时长,默认为None表示无限等待。

ready(self)

返回的调用是否已经完成,完成返回True,否则返回False

successful(self)

返回的调用是否在没有引发异常的情况下完成。如果结果还未计算完成。会抛出AssertionError异常

import multiprocessing
import logging
import sys
import time

logging.basicConfig(level=logging.INFO,format="%(name)s %(process)d %(processName)s %(thread)d %(message)s",stream=sys.stdout)
log = logging.getLogger(__name__)

def worker(k):
time.sleep(2)
log.info("正在打印-{}".format(k))
return k+5

def callback(value):
log.info("{}\t type={}".format(value,type(value)))

if __name__ == "__main__":
p = multiprocessing.Pool(4)
reqs = []
for i in range(4):
req = p.apply_async(func=worker,args=(i,),callback=callback,error_callback=callback)
log.info("{}~~~~~~~~{}".format(req,type(req)))
reqs.append(req)
print("-----end------")
while True:
for i in reqs:
if not i.ready():
break
else:
for i in reqs:
log.info(i.get())
break
time.sleep(1)

多进程,多线程的选择

  1. CPU密集型
  • CPython中使用到了GIL,多线程的时候锁相互竞争,且多核优势不能发挥,选用Python多进程效率更高。
  1. IO密集型
  • 在Python中适合是用多线程,可以减少多进程间IO的序列化开销。且在IO等待的时候,切换到其他线程继续执 行,效率不错。

应用

  • 请求/应答模型:WEB应用中常见的处理模型
  • master启动多个worker工作进程,一般和CPU数目相同。发挥多核优势。
  • worker工作进程中,往往需要操作网络IO和磁盘IO,启动多线程,提高并发处理能力。worker处理用户的请求, 往往需要等待数据,处理完请求还要通过网络IO返回响应。这就是nginx工作模式。

Linux的特殊进程

在Linux/Unix中,通过父进程创建子进程。

僵尸进程

一个进程使用了fork创建了子进程,如果子进程终止进入僵死状态,而父进程并没有调用wait或者waitpid获取子进 程的状态信息,那么子进程仍留下一个数据结构保存在系统中,这种进程称为僵尸进程。
僵尸进程会占用一定的内存空间,还占用了进程号,所以一定要避免大量的僵尸进程产生。有很多方法可以避免僵 尸进程。

孤儿进程

父进程退出,而它的子进程仍在运行,那么这些子进程就会成为孤儿进程。孤儿进程会被init进程(进程号为1)收 养,并由init进程对它们完成状态收集工作。
init进程会循环调用wait这些孤儿进程,所以,孤儿进程没有什么危害。

守护进程

它是运行在后台的一种特殊进程。它独立于控制终端并周期性执行某种任务或等待处理某些事件。
守护进程的父进程是init进程,因为其父进程已经故意被终止掉了。
守护进程相对于普通的孤儿进程需要做一些特殊处理。


上一篇:【useradd】 选项 用户名
下一篇:没有了
网友评论