目录 固定时间间隔执行任务 time.sleep threading.Timer 固定时间点执行任务 在项目中,我们可能遇到有定时任务的需求。 其一:每隔一个时间段就执行任务。 比如:压测中每隔45分钟调整温
- 固定时间间隔执行任务
- time.sleep
- threading.Timer
- 固定时间点执行任务
在项目中,我们可能遇到有定时任务的需求。
- 其一:每隔一个时间段就执行任务。
比如:压测中每隔45分钟调整温箱的温度。 - 其二:定时执行任务。
例如每天早上 8 点定时推送早报。
今天,我跟大家分享下 Python 定时任务的实现方法。
固定时间间隔执行任务import time import logging logging.basicConfig( level=logging.debug, format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s" ) def task(): logging.info("Task Start.") time.sleep(1) logging.info("Task Done.")time.sleep
第一种办法是最简单又最暴力。
那就是在一个死循环中,使用线程睡眠函数 sleep()。
while True: task() time.sleep(5)
上述的方法有几个问题:
- 阻塞主进程,这个也好解决,可以放到线程中去执行
- 一次只有一个task,这个也有解决办法,可以多启几个线程
既然第一种方法暴力,那么有没有比较优雅点的方法?
Python 标准库 threading 中有个 Timer 类。
它会新启动一个线程来执行定时任务,所以它是非阻塞函式。
原理:线程中预置一个finished的事件,通过finished.wait等待固定时间间隔。
超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。
from threading import Timer t = Timer(task, 5) t.start()
优点:
不阻塞主进程,task在线程中执行
缺点:
一个Timer只能执行一次task就结束了。
如果想循环,需要改造一下task函数。
from threading import Timer def repeat_task(): t = Timer(5, repeat_task) # 开始任务的位置决定了是任务之间等待固定间隔时间 # 还是每个任务的开始等待固定间隔时间 t.start() task()
这样可以循环执行,但是仍然只能一个线程一个任务。
固定时间点执行任务Q:如何跳出循环
A:加入一个标识符:
sleep
可以用一个布尔值来控制Timer
可以用一个Event来控制