Python中实现定时任务详解

2023-12-05 0 436
目录
  • 固定时间间隔执行任务
    • time.sleep
    • threading.Timer
  • 固定时间点执行任务

    在项目中,我们可能遇到有定时任务的需求。

    • 其一:每隔一个时间段就执行任务。比如:压测中每隔45分钟调整温箱的温度。
    • 其二:定时执行任务。例如每天早上 8 点定时推送早报。

    今天,我跟大家分享下 Python 定时任务的实现方法。

    固定时间间隔执行任务

    import time
    import logging

    logging.basicConfig(
    level=logging.debug,
    format=\”%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s – %(message)s\”
    )

    def task():
    logging.info(\”Task Start.\”)
    time.sleep(1)
    logging.info(\”Task Done.\”)

    time.sleep

    第一种办法是最简单又最暴力。那就是在一个死循环中,使用线程睡眠函数 sleep()。

    while True:
    task()
    time.sleep(5)

    上述的方法有几个问题:

    • 阻塞主进程,这个也好解决,可以放到线程中去执行
    • 一次只有一个task,这个也有解决办法,可以多启几个线程

    threading.Timer

    既然第一种方法暴力,那么有没有比较优雅点的方法?Python 标准库 threading 中有个 Timer 类。它会新启动一个线程来执行定时任务,所以它是非阻塞函式。

    原理:线程中预置一个finished的事件,通过finished.wait等待固定时间间隔。超时则执行任务。如果需要取消任务,可以调用Timer.cancel来取消任务。

    from threading import Timer

    t = Timer(task, 5)
    t.start()

    优点:不阻塞主进程,task在线程中执行

    缺点:一个Timer只能执行一次task就结束了。如果想循环,需要改造一下task函数。

    from threading import Timer

    def repeat_task():
    t = Timer(5, repeat_task)
    # 开始任务的位置决定了是任务之间等待固定间隔时间
    # 还是每个任务的开始等待固定间隔时间
    t.start()
    task()

    这样可以循环执行,但是仍然只能一个线程一个任务。

    Q:如何跳出循环A:加入一个标识符:

    • sleep可以用一个布尔值来控制
    • Timer可以用一个Event来控制

    固定时间点执行任务

    以上是用内置的方法中比较简单的实现方式。简单的功能可以实现。

    前面执行的都是指定间隔时间的定时任务,那怎么执行指定时间点的任务呢?

    上面的方法是可以做到的。有两种思路,

    • 前面我们指定了间隔时间,那指定时间点,就先计算当前时间到指定时间点的间隔时间
    • 不管间隔时间,而是记录任务时间点,然后实时去检查是否到达指定时间点

    思路有了,但是对于固定时间点执行任务的场景以及后面更复杂的场景,自己实现可能就变得复杂。

    下面再介绍几个进阶的定时任务的实现方式,可以适应更复杂的业务场景。

    sched

    第三种方式是使用标准库中sched模块。sched是事件调度器,它通过scheduler类来调度事件,从而达到定时执行任务的效果。

    简单示例如下:

    import sched

    schedule = sched.scheduler() # 初始化 sched 模块的 scheduler 类
    schedule.enter(10, 1, task) # 增加调度任务
    schedule.run() # 开始调度任务

    scheduler 提供了两个添加调度任务的函数:

    • enter(delay, priority, action, argument=(), kwargs={})该函数可以延迟一定时间执行任务。delay 表示延迟多长时间执行任务,单位是秒。priority为优先级,越小优先级越大。两个任务指定相同的延迟时间,优先级大的任务会向被执行。action 即需要执行的函数,argument 和 kwargs 分别是函数的位置和关键字参数。

    • scheduler.enterabs(time, priority, action, argument=(), kwargs={})添加一项任务,但这个任务会在 time 这时刻执行。因此,time 是绝对时间。其他参数用法与 enter() 中的参数用法是一致。

    优点:

    • 执行时间间隔和时间点执行任务
    • 可以添加不同的任务
    • 任务可以设置优先级

    缺点:scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务, 需要重复添加调度任务即可。

    import sched
    import threading
    from functools import wraps

    s = sched.scheduler()
    STOP_FLAG = threading.Event()

    def repeat(interval):
    def wrapper(func):
    @wraps(func)
    def inner():
    if not STOP_FLAG.is_set():
    s.enter(interval, 0, inner)
    func()
    return inner

    return wrapper

    @repeat(5)
    def new_task():
    return task()

    new_task()
    t = threading.Thread(target=s.run)
    t.start()

    实现原理

    当然我们仅仅学会怎么用还是不够的,不能知其然而不知其所以然。介绍了那么多方法,那么多库,但是底层的实现逻辑都是差不多的。一个完整的定时任务系统,有三个部分:

    • 任务队列(task queue)根据执行时间和优先级进行排序排序sorted(): 任务一多,整个队列重排,性能堪忧heapq.pop():堆排序,只获取最高优先级的任务,不重复排序
    • 调度器(scheduler)
    • 执行器(executor)从前到后,实现的内容越来越多,控制的精细度也就越来越高,实现的功能也就越来越丰富。

    import sched
    import time
    import threading
    from functools import wraps

    from task import task
    from typing import Callable

    STOP_FLAG = threading.Event()

    def timeloop(func: Callable, interval: int):
    while True:
    func()
    time.sleep(interval)

    def repeat_task(func: Callable, interval: int):
    # 新建任务的前后,决定了是在任务结束后等待时间,还是固定间隔
    if not STOP_FLAG.is_set():
    t = threading.Timer(interval, repeat_task, args=(func, interval))
    t.start()
    func()

    def timer_schedule(func: Callable, interval: int):
    repeat_task(func, interval)

    s = sched.scheduler()

    def repeat(interval):
    def wrapper(func):
    @wraps(func)
    def inner():
    if not STOP_FLAG.is_set():
    s.enter(interval, 0, inner)
    func()
    return inner

    return wrapper

    @repeat(5)
    def new_task():
    return task()

    def sched_schedule(func: Callable, interval: int):
    func()
    t = threading.Thread(target=s.run)
    t.start()
    return t

    def main(schedule):
    schedule_thread = schedule(new_task, 5)
    while True:
    try:
    time.sleep(1)
    except KeyboardInterrupt:
    STOP_FLAG.set()
    schedule_thread.join()
    break

    if __name__ == \”__main__\”:
    # main(timeloop)
    # main(timer_schedule)
    main(sched_schedule)

    到此这篇关于Python中实现定时任务详解的文章就介绍到这了,更多相关Python中实现定时任务内容请搜索悠久资源网以前的文章或继续浏览下面的相关文章希望大家以后多多支持悠久资源网!

    您可能感兴趣的文章:

    • Python实现定时任务的九种方案总结
    • Pythonapscheduler实现定时任务的方法详解
    • python中实现定时任务的几种方案
    • Python实现自动定时登录校园网
    • Python定时执行程序问题(schedule)
    • Python命令行定时任务自动化工作流程
    • 一文详解Python定时任务触发
    • Windows下创建定时任务执行Python脚本的方法实现

    收藏 (0) 打赏

    感谢您的支持,我会继续努力的!

    打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
    点赞 (0)

    悠久资源 Python Python中实现定时任务详解 https://www.u-9.cn/jiaoben/python/100008.html

    常见问题

    相关文章

    发表评论
    暂无评论
    官方客服团队

    为您解决烦忧 - 24小时在线 专业服务