threading.
active_count
()
返回当前存活的线程类 Thread
对象。返回的计数等于 enumerate()
返回的列表长度。
threading.
current_thread
()
返回当前对应调用者的控制线程的 Thread
对象。如果调用者的控制线程不是利用 threading
创建,会返回一个功能受限的虚拟线程对象。
threading.
get_ident
()
返回当前线程的 “线程标识符”。它是一个非零的整数。它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用。
threading.
enumerate
()
以列表形式返回当前所有存活的 Thread
对象。 该列表包含守护线程,current_thread()
创建的虚拟线程对象和主线程。它不包含已终结的线程和尚未开始的线程。
threading.
main_thread
()
返回主 Thread
对象。一般情况下,主线程是Python解释器开始时创建的线程。
threading.
setprofile
(func)
为所有 threading
模块开始的线程设置性能测试函数。在每个线程的 run()
方法被调用前,func 会被传递给 sys.setprofile()
。
threading.
stack_size
([size])
返回创建线程时用的堆栈大小。可选参数 size 指定之后新建的线程的堆栈大小,而且一定要是0(根据平台或者默认配置)或者最小是32,768(32KiB)的一个正整数。如果 size 没有指定,默认是0。如果不支持改变线程堆栈大小,会抛出 RuntimeError
错误。如果指定的堆栈大小不合法,会抛出 ValueError
错误并且不会修改堆栈大小。32KiB是当前最小的能保证解释器有足够堆栈空间的堆栈大小。需要注意的是部分平台对于堆栈大小会有特定的限制,例如要求大于32KiB的堆栈大小或者需要根据系统内存页面的整数倍进行分配 - 应当查阅平台文档有关详细信息(4KiB页面比较普遍,在没有更具体信息的情况下,建议的方法是使用4096的倍数作为堆栈大小)。
threading.
TIMEOUT_MAX
阻塞函数( Lock.acquire()
, RLock.acquire()
, Condition.wait()
, ...)中形参 timeout 允许的最大值。传入超过这个值的 timeout 会抛出 OverflowError
异常。
有个 "主线程" 对象;这对应Python程序里面初始的控制线程。它不是一个守护线程。
"虚拟线程对象" 是可以被创建的。这些是对应于“外部线程”的线程对象,它们是在线程模块外部启动的控制线程,例如直接来自C代码。虚拟线程对象功能受限;他们总是被认为是存活的和守护模式,不能被 join()
。因为无法检测外来线程的终结,它们永远不会被删除。
-
class
threading.
Thread
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) -
调用这个构造函数时,必需带有关键字参数。参数如下:
group 应该为
None
;为了日后扩展ThreadGroup
类实现而保留。target 是用于
run()
方法调用的可调用对象。默认是None
,表示不需要调用任何方法。name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数。
args 是用于调用目标函数的参数元组。默认是
()
。kwargs 是用于调用目标函数的关键字参数字典。默认是
{}
。如果 daemon 不是
None
,线程将被显式的设置为 守护模式,不管该线程是否是守护模式。如果是None
(默认值),线程将继承当前线程的守护模式属性。如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(
Thread.__init__()
)。在 3.3 版更改: 加入 daemon 参数。
-
start
() -
开始线程活动。
它在一个线程里最多只能被调用一次。它安排对象的
run()
方法在一个独立的控制进程中调用。如果同一个线程对象中调用这个方法的次数大于一次,会抛出
RuntimeError
。
-
run
() -
代表线程活动的方法。
你可以在子类型里重载这个方法。 标准的
run()
方法会对作为 target 参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 args 和 kwargs 参数分别获取的位置和关键字参数。
-
join
(timeout=None) -
等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用
join()
的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。当 timeout 参数存在而且不是
None
时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为join()
总是返回None
,所以你一定要在join()
后调用is_alive()
才能判断是否发生超时 -- 如果线程仍然存货,则join()
超时。当 timeout 参数不存在或者是
None
,这个操作会阻塞直到线程终结。一个线程可以被
join()
很多次。如果尝试加入当前线程会导致死锁,
join()
会引起RuntimeError
异常。如果尝试join()
一个尚未开始的线程,也会抛出相同的异常。
-
name
-
只用于识别的字符串。它没有语义。多个线程可以赋予相同的名称。 初始名称由构造函数设置。
-
getName
() -
setName
() -
旧的
name
取值/设值 API;直接当做特征属性使用它。
-
ident
-
这个线程的 ‘线程标识符‘,如果线程尚未开始则为
None
。这是个非零整数。参见get_ident()
函数。当一个线程退出而另外一个线程被创建,线程标识符会被复用。即使线程退出后,仍可得到标识符。
-
is_alive
() -
返回线程是否存活。
当
run()
方法刚开始直到run()
方法刚结束,这个方法返回True
。模块函数enumerate()
返回包含所有存活线程的列表。
-
daemon
-
一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用
start()
前设置好,不然会抛出RuntimeError
。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是daemon
=False
。当没有存活的非守护线程时,整个Python程序才会退出。
-
isDaemon
() -
setDaemon
() -
旧的
name
取值/设值 API;建议直接当做特征属性使用它。#线程是cpu最小的单元 from threading import Thread,active_count,current_thread,enumerate,main_thread,stack_size,Lock import time import random # print(active_count()) #返回当前的线程数 # print(current_thread()) #返回当前的线程Thread对象 # print(enumerate()) #反回当前活动的线程 # print(main_thread()) #返回当前的主线程 # print(stack_size()) #返回创建线程时使用的栈的大小,如果size参数,则用来指定后续创建的线程使用的栈的大小,size必须是0,(表示使用系统默认的值)大于32k的正整数 # arr = [] def task(num): time.sleep(3) print(num) # start = time.time() for i in range(0,5): t =Thread(target=task,args=(i,)) t.start() arr.append(t) # print(enumerate()) # t.join() for j in arr: j.join() end = time.time() print(‘时间:%s‘%(end-start)) ‘‘‘ Thread对象常用方法: start()运行线程 join()阻塞主线程 time阻塞时间 name 线程的名字 is_alive() 判断线程是否存活 ident 线程的标识 daemon 是否为守护进程 ‘‘‘ # class MyI(Thread): # def __init__(self): # super(MyI,self).__init__() # def run(self): # time.sleep(3) # print(self.name,self.ident,self.daemon) # # t1 = MyI() # t1.name = "任务一" # # t1.daemon=True # t1.start() # # h2=[] # l = Lock() # class xm(Thread): # # def __init__(self): # super(xm,self).__init__() # self.h = True # def run(self): # # if len(h2)<5: # l.acquire() # while self.h: # h2.append(1) # print(‘小明往锅里加了%s‘%len(h2)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘) # if len(h2)==5: # l.release() # self.h = False # # class xh(Thread): # def __init__(self): # super(xh,self).__init__() # self.h = True # # def run(self): # i = 0 # if len(h2)==5: # l.acquire() # while self.h: # i+=1 # h2.pop(0) # print(‘小红吃了锅里的%s‘%(i)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘) # if len(h2)==0: # l.release() # self.h = False # # # x1 = xm() # x1.start() # # x2 = xh() # x2.start() # # ‘‘‘ # def m(*args): # print(random.randint(1,args)) # Thread(group=None,target=m(),args=(1,2,34,6)) #构造函数 # target 线程启动执行函数 # args 是元组 是线程执行函数的参数 # kwargs 是线程执行函数的参数 # ‘‘‘
multiprocessing
是一个用与threading
模块相似API的支持产生进程的包。multiprocessing
包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此,multiprocessing
模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。multiprocessing
模块还引入了在threading
模块中没有类似物的API。这方面的一个主要例子是Pool
对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用Pool
,from multiprocessing import Pool def f(x): return x*x if __name__ == ‘__main__‘: with Pool(5) as p: print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
Process
类在
multiprocessing
中,通过创建一个Process
对象然后调用它的start()
方法来生成进程。Process
和threading.Thread
API 相同。 一个简单的多进程程序示例是:from multiprocessing import Process def f(name): print(‘hello‘, name) if __name__ == ‘__main__‘: p = Process(target=f, args=(‘bob‘,)) p.start() p.join()
要显示所涉及的各个进程ID,这是一个扩展示例:
from multiprocessing import Process import os def info(title): print(title) print(‘module name:‘, __name__) print(‘parent process:‘, os.getppid()) print(‘process id:‘, os.getpid()) def f(name): info(‘function f‘) print(‘hello‘, name) if __name__ == ‘__main__‘: info(‘main line‘) p = Process(target=f, args=(‘bob‘,)) p.start() p.join()
POOL进程池
# from multiprocessing import Pool
# def fun():
# sum = 0
# for i in range(0,100000):
# sum+=i
# print(sum)
#
# if __name__ == ‘__main__‘:
# pool = Pool(5) #5个进程的进程池
# start = time.time()
# for i in range(100):
# # pool.apply(fun)
# obj = pool.apply_async(fun)
# print(obj)
#
# end =time.time()
# print(end-start)队列
Queue
类是一个近似queue.Queue
的克隆。 例如:from multiprocessing import Process, Queue def f(q): q.put([42, None, ‘hello‘]) if __name__ == ‘__main__‘: q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, ‘hello‘]" p.join()
队列是线程和进程安全的。
管道
Pipe()
函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, ‘hello‘]) conn.close() if __name__ == ‘__main__‘: parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, ‘hello‘]" p.join()
返回的两个连接对象
Pipe()
表示管道的两端。每个连接对象都有send()
和recv()
方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。进程之间的同步
multiprocessing
包含来自threading
的所有同步基本体的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print(‘hello world‘, i) finally: l.release() if __name__ == ‘__main__‘: lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
不使用来自不同进程的锁输出容易产生混淆。
在进程之间共享状态
如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。
但是,如果你真的需要使用一些共享数据,那么
multiprocessing
提供了两种方法。共享内存
可以使用
Value
或Array
将数据存储在共享内存映射中。例如,以下代码:from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == ‘__main__‘: num = Value(‘d‘, 0.0) arr = Array(‘i‘, range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])
将打印
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建
num
和arr
时使用的‘d‘
和‘i‘
参数是array
模块使用的类型的 typecode :‘d‘
表示双精度浮点数,‘i‘
表示有符号整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,可以使用
multiprocessing.sharedctypes
模块,该模块支持创建从共享内存分配的任意ctypes对象。服务器进程
由
Manager()
返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。Manager()
返回的管理器支持类型:list
、dict
、Namespace
、Lock
、RLock
、Semaphore
、BoundedSemaphore
、Condition
、Event
、Barrier
、Queue
、Value
和Array
。例如from multiprocessing import Process, Manager def f(d, l): d[1] = ‘1‘ d[‘2‘] = 2 d[0.25] = None l.reverse() if __name__ == ‘__main__‘: with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
将打印
{0.25: None, 1: ‘1‘, ‘2‘: 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。
服务端的创建
from multiprocessing.managers import BaseManager
from queue import Queue
queue = Queue()
queue.put(‘lolololol‘)
class QueueManager(BaseManager): pass
QueueManager.register(‘get_queue‘, callable=lambda:queue)
m = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘2ab‘)
s = m.get_server()
s.serve_forever() #创建服务器
客户端的创建from multiprocessing.managers import BaseManagerclass QueueManager(BaseManager): passQueueManager.register(‘get_queue‘)m = QueueManager(address=(‘127.0.0.1‘, 5000), authkey=b‘2ab‘)m.connect()queue = m.get_queue()print(queue.get())
-