当前位置 : 主页 > 编程语言 > python >

Python中实现定时任务详解

来源:互联网 收集:自由互联 发布时间:2023-08-17
目录 固定时间间隔执行任务 time.sleep threading.Timer 固定时间点执行任务 在项目中,我们可能遇到有定时任务的需求。 其一:每隔一个时间段就执行任务。 比如:压测中每隔45分钟调整温
目录
  • 固定时间间隔执行任务
    • time.sleep
    • threading.Timer
  • 固定时间点执行任务

    在项目中,我们可能遇到有定时任务的需求。

    • 其一:每隔一个时间段就执行任务。
      比如:压测中每隔45分钟调整温箱的温度。
    • 其二:定时执行任务。
      例如每天早上 8 点定时推送早报。

    今天,我跟大家分享下 Python 定时任务的实现方法。

    固定时间间隔执行任务
    import time
    import logging
    
    logging.basicConfig(
        level=logging.debug,
        format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s"
    )
    
    
    def task():
        logging.info("Task Start.")
        time.sleep(1)
        logging.info("Task Done.")
    

    time.sleep

    第一种办法是最简单又最暴力。
    那就是在一个死循环中,使用线程睡眠函数 sleep()。

    while True:
        task()
        time.sleep(5)

    上述的方法有几个问题:

    • 阻塞主进程,这个也好解决,可以放到线程中去执行
    • 一次只有一个task,这个也有解决办法,可以多启几个线程

    threading.Timer

    既然第一种方法暴力,那么有没有比较优雅点的方法?
    Python 标准库 threading 中有个 Timer 类。
    它会新启动一个线程来执行定时任务,所以它是非阻塞函式。

    原理:线程中预置一个finished的事件,通过finished.wait等待固定时间间隔。
    超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。

    from threading import Timer
    
    t = Timer(task, 5)
    t.start()
    

    优点:
    不阻塞主进程,task在线程中执行

    缺点:
    一个Timer只能执行一次task就结束了。
    如果想循环,需要改造一下task函数。

    from threading import Timer
    
    def repeat_task():
        t = Timer(5, repeat_task)
        # 开始任务的位置决定了是任务之间等待固定间隔时间
        # 还是每个任务的开始等待固定间隔时间
        t.start()
        task()
    

    这样可以循环执行,但是仍然只能一个线程一个任务。

    Q:如何跳出循环
    A:加入一个标识符:

    • sleep可以用一个布尔值来控制
    • Timer可以用一个Event来控制

    固定时间点执行任务

    【文章原创作者:大丰网站制作公司 http://www.1234xp.com/dafeng.html 提供,感恩】

    网友评论