当前位置 : 主页 > 编程语言 > python >

python多线程,多进程

来源:互联网 收集:自由互联 发布时间:2021-06-25
threading. active_count ( ) 返回当前存活的线程类 Thread 对象。返回的计数等于 enumerate() 返回的列表长度。 threading. current_thread ( ) 返回当前对应调用者的控制线程的 Thread 对象。如果调用者的

threading.active_count()

返回当前存活的线程类 Thread 对象。返回的计数等于 enumerate() 返回的列表长度。

threading.current_thread()

返回当前对应调用者的控制线程的 Thread 对象。如果调用者的控制线程不是利用 threading 创建,会返回一个功能受限的虚拟线程对象。

threading.get_ident()

 

返回当前线程的 “线程标识符”。它是一个非零的整数。它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用。

threading.enumerate()

以列表形式返回当前所有存活的 Thread 对象。 该列表包含守护线程,current_thread() 创建的虚拟线程对象和主线程。它不包含已终结的线程和尚未开始的线程。

threading.main_thread()

 

返回主 Thread 对象。一般情况下,主线程是Python解释器开始时创建的线程。

threading.setprofile(func)

为所有 threading 模块开始的线程设置性能测试函数。在每个线程的 run() 方法被调用前,func 会被传递给 sys.setprofile() 。

threading.stack_size([size])

返回创建线程时用的堆栈大小。可选参数 size 指定之后新建的线程的堆栈大小,而且一定要是0(根据平台或者默认配置)或者最小是32,768(32KiB)的一个正整数。如果 size 没有指定,默认是0。如果不支持改变线程堆栈大小,会抛出 RuntimeError 错误。如果指定的堆栈大小不合法,会抛出 ValueError 错误并且不会修改堆栈大小。32KiB是当前最小的能保证解释器有足够堆栈空间的堆栈大小。需要注意的是部分平台对于堆栈大小会有特定的限制,例如要求大于32KiB的堆栈大小或者需要根据系统内存页面的整数倍进行分配 - 应当查阅平台文档有关详细信息(4KiB页面比较普遍,在没有更具体信息的情况下,建议的方法是使用4096的倍数作为堆栈大小)。

threading.TIMEOUT_MAX

阻塞函数( Lock.acquire()RLock.acquire()Condition.wait(), ...)中形参 timeout 允许的最大值。传入超过这个值的 timeout 会抛出 OverflowError 异常。

有个 "主线程" 对象;这对应Python程序里面初始的控制线程。它不是一个守护线程。

"虚拟线程对象" 是可以被创建的。这些是对应于“外部线程”的线程对象,它们是在线程模块外部启动的控制线程,例如直接来自C代码。虚拟线程对象功能受限;他们总是被认为是存活的和守护模式,不能被 join() 。因为无法检测外来线程的终结,它们永远不会被删除。

class  threading. Thread (group=Nonetarget=Nonename=Noneargs=()kwargs={}*daemon=None)

调用这个构造函数时,必需带有关键字参数。参数如下:

group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。

target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。

name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数。

args 是用于调用目标函数的参数元组。默认是 ()

kwargs 是用于调用目标函数的关键字参数字典。默认是 {}

如果 daemon 不是 None,线程将被显式的设置为 守护模式,不管该线程是否是守护模式。如果是 None (默认值),线程将继承当前线程的守护模式属性。

如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__())。

在 3.3 版更改: 加入 daemon 参数。

start ()

开始线程活动。

它在一个线程里最多只能被调用一次。它安排对象的 run() 方法在一个独立的控制进程中调用。

如果同一个线程对象中调用这个方法的次数大于一次,会抛出 RuntimeError 。

run ()

代表线程活动的方法。

你可以在子类型里重载这个方法。 标准的 run() 方法会对作为 target 参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 args 和 kwargs 参数分别获取的位置和关键字参数。

join (timeout=None)

等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。

当 timeout 参数存在而且不是 None 时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为 join() 总是返回 None ,所以你一定要在 join() 后调用 is_alive() 才能判断是否发生超时 -- 如果线程仍然存货,则 join() 超时。

当 timeout 参数不存在或者是 None ,这个操作会阻塞直到线程终结。

一个线程可以被 join() 很多次。

如果尝试加入当前线程会导致死锁, join() 会引起 RuntimeError 异常。如果尝试 join() 一个尚未开始的线程,也会抛出相同的异常。

name

只用于识别的字符串。它没有语义。多个线程可以赋予相同的名称。 初始名称由构造函数设置。

getName ()
setName ()

旧的 name 取值/设值 API;直接当做特征属性使用它。

ident

这个线程的 ‘线程标识符‘,如果线程尚未开始则为 None 。这是个非零整数。参见 get_ident() 函数。当一个线程退出而另外一个线程被创建,线程标识符会被复用。即使线程退出后,仍可得到标识符。

is_alive ()

返回线程是否存活。

当 run() 方法刚开始直到 run() 方法刚结束,这个方法返回 True 。模块函数 enumerate() 返回包含所有存活线程的列表。

daemon

一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用 start() 前设置好,不然会抛出 RuntimeError 。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False

当没有存活的非守护线程时,整个Python程序才会退出。

isDaemon ()
setDaemon ()

旧的 name 取值/设值 API;建议直接当做特征属性使用它。

#线程是cpu最小的单元

from threading import Thread,active_count,current_thread,enumerate,main_thread,stack_size,Lock
import time
import random

# print(active_count()) #返回当前的线程数
# print(current_thread()) #返回当前的线程Thread对象
# print(enumerate()) #反回当前活动的线程
# print(main_thread()) #返回当前的主线程
# print(stack_size()) #返回创建线程时使用的栈的大小,如果size参数,则用来指定后续创建的线程使用的栈的大小,size必须是0,(表示使用系统默认的值)大于32k的正整数

#
arr = []
def task(num):
    time.sleep(3)
    print(num)
#
start = time.time()

for i in range(0,5):
    t =Thread(target=task,args=(i,))
    t.start()
    arr.append(t)
    # print(enumerate())
    # t.join()

for j in arr:
    j.join()

end = time.time()
print(时间:%s%(end-start))

‘‘‘
Thread对象常用方法:
start()运行线程
join()阻塞主线程 time阻塞时间
name 线程的名字
is_alive() 判断线程是否存活
ident 线程的标识
daemon 是否为守护进程
‘‘‘

# class MyI(Thread):
#     def __init__(self):
#         super(MyI,self).__init__()
#     def run(self):
#         time.sleep(3)
#         print(self.name,self.ident,self.daemon)
#
# t1 = MyI()
# t1.name = "任务一"
# # t1.daemon=True
# t1.start()

#
# h2=[]
# l = Lock()
# class xm(Thread):
#
#     def __init__(self):
#         super(xm,self).__init__()
#         self.h = True
#     def run(self):
#
#         if len(h2)<5:
#             l.acquire()
#             while self.h:
#                 h2.append(1)
#                 print(‘小明往锅里加了%s‘%len(h2)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
#                 if len(h2)==5:
#                     l.release()
#                     self.h = False
#
# class xh(Thread):
#     def __init__(self):
#         super(xh,self).__init__()
#         self.h = True
#
#     def run(self):
#         i = 0
#         if len(h2)==5:
#             l.acquire()
#             while self.h:
#                 i+=1
#                 h2.pop(0)
#                 print(‘小红吃了锅里的%s‘%(i)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
#                 if len(h2)==0:
#                     l.release()
#                     self.h = False
#
#
# x1 = xm()
# x1.start()
#
# x2 = xh()
# x2.start()
#
# ‘‘‘
# def m(*args):
#     print(random.randint(1,args))
# Thread(group=None,target=m(),args=(1,2,34,6)) #构造函数
# target 线程启动执行函数
# args 是元组 是线程执行函数的参数
# kwargs 是线程执行函数的参数
# ‘‘‘

multiprocessing 是一个用与 threading 模块相似API的支持产生进程的包。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。

multiprocessing 模块还引入了在 threading 模块中没有类似物的API。这方面的一个主要例子是 Pool 对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用 Pool ,

from multiprocessing import Pool def f(x): return x*x if __name__ == ‘__main__‘: with Pool(5) as p: print(p.map(f, [1, 2, 3])) 

将打印到标准输出

[1, 4, 9]

Process 类

在 multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程。 Process 和 threading.Thread API 相同。 一个简单的多进程程序示例是:

from multiprocessing import Process def f(name): print(‘hello‘, name) if __name__ == ‘__main__‘: p = Process(target=f, args=(‘bob‘,)) p.start() p.join() 

要显示所涉及的各个进程ID,这是一个扩展示例:

from multiprocessing import Process import os def info(title): print(title) print(‘module name:‘, __name__) print(‘parent process:‘, os.getppid()) print(‘process id:‘, os.getpid()) def f(name): info(‘function f‘) print(‘hello‘, name) if __name__ == ‘__main__‘: info(‘main line‘) p = Process(target=f, args=(‘bob‘,)) p.start() p.join()

POOL进程池
# from multiprocessing import Pool
# def fun():
# sum = 0
# for i in range(0,100000):
# sum+=i
# print(sum)
#
# if __name__ == ‘__main__‘:
# pool = Pool(5) #5个进程的进程池
# start = time.time()
# for i in range(100):
# # pool.apply(fun)
# obj = pool.apply_async(fun)
# print(obj)
#
# end =time.time()
# print(end-start)

 
       
       

队列

Queue 类是一个近似 queue.Queue 的克隆。 例如:

from multiprocessing import Process, Queue def f(q): q.put([42, None, ‘hello‘]) if __name__ == ‘__main__‘: q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, ‘hello‘]" p.join() 

队列是线程和进程安全的。

管道

Pipe() 函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, ‘hello‘]) conn.close() if __name__ == ‘__main__‘: parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, ‘hello‘]" p.join() 

返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。

 
       
       

进程之间的同步

multiprocessing 包含来自 threading 的所有同步基本体的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print(‘hello world‘, i) finally: l.release() if __name__ == ‘__main__‘: lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start() 

不使用来自不同进程的锁输出容易产生混淆。

 
       
       

在进程之间共享状态

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

但是,如果你真的需要使用一些共享数据,那么 multiprocessing 提供了两种方法。

共享内存

可以使用 Value 或 Array 将数据存储在共享内存映射中。例如,以下代码:

from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == ‘__main__‘: num = Value(‘d‘, 0.0) arr = Array(‘i‘, range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) 

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9] 

创建 num 和 arr 时使用的 ‘d‘ 和 ‘i‘ 参数是 array 模块使用的类型的 typecode : ‘d‘ 表示双精度浮点数, ‘i‘ 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务器进程

由 Manager() 返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。例如

from multiprocessing import Process, Manager def f(d, l): d[1] = ‘1‘ d[‘2‘] = 2 d[0.25] = None l.reverse() if __name__ == ‘__main__‘: with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l) 

将打印

{0.25: None, 1: ‘1‘, ‘2‘: 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] 

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

服务端的创建
from multiprocessing.managers import BaseManager
from queue import Queue
queue = Queue()
queue.put(‘lolololol‘)
class QueueManager(BaseManager): pass
QueueManager.register(‘get_queue‘, callable=lambda:queue)
m = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘2ab‘)
s = m.get_server()
s.serve_forever() #创建服务器

客户端的创建
from multiprocessing.managers import BaseManagerclass QueueManager(BaseManager): passQueueManager.register(‘get_queue‘)m = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘2ab‘)m.connect()queue = m.get_queue()print(queue.get())
网友评论