Skip to content

Python编排定时任务

904字约3分钟

Python

2024-10-30

Python编排定时任务

scheduler

使用友好的语法定期运行 Python 函数(或任何其他可调用函数), scheduler 有以下特点:

  • 一个易于使用的作业调度 API。
  • 用于定期作业的进程内调度程序,无需额外的进程。
  • 非常轻量且没有外部依赖。
  • 出色的测试覆盖率。
  • 已在 Python 3.7、3.8、3.9、3.10 和 3.11 上测试。

使用 pip 即可安装 scheduler 模块

pip install scheduler

示例代码

import time
import schedule
from datetime import datetime

# 要执行的job方法
def job():
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"{current_time} 执行JOB")

# 每分钟执行一次作业
schedule.every(1).minutes.do(job)

# 每小时执行一次作业
schedule.every(1).hours.do(job)

# 每天的20:01执行一次作业
schedule.every().day.at("20:01").do(job)

# 每周日执行一次作业
schedule.every().sunday.do(job)

# 每周三的 20:00 执行一次作业
schedule.every().wednesday.at("20:00").do(job)

# 作业的执行监控,到了时间就执行作业
while True:
    schedule.run_pending()
    time.sleep(1)

运行如上代码,会得到如下结果。

image-20241030200309954

这里是 schedule 的示例模块,作业以一定的间隔时间执行,schedule.every 是注册要执行的作业和执行间隔的描述,可以间隔几分钟、几小时甚至是特定的日期和时间运行一次。

# 每分钟执行一次作业
schedule.every(1).minutes.do(job)

schedule.run_pending()schedule 库中的一个函数,用于运行所有已安排好的任务(即在定时任务的调度中,应该执行的任务),如果只是简单地调用 schedule.run_pending(),一旦作业执行完毕,模块就会结束,需要使用 while 语句来创建一个无限循环,通过无限循环的状态,可以定期继续执行相同的过程。

while True:
    schedule.run_pending()
    time.sleep(1)

封装一个定时任务

如下是使用 schedule 模块后,封装清理日志文件的一段代码:

  • 定义了 CleanLog 类,定义了属性、获取日志文件的方法、清理日志文件的方法、编排定时任务的方法。
  • 文件声明了启动所需的参数,以及主函数。
  • 主函数实例化 CleanLog 类,执行其编排定时任务的方法,创建了一个定时任务。
# -*- coding:utf-8 -*-
# !/usr/bin/env python
import os
import time
import schedule
import argparse

class CleanLog(object):

    def __init__(self, cluster):
        self.cluster = cluster
        self.cluster_dir = os.path.join("/sidp/apps/ibdc", cluster, "logs")

    def get_log_files(self):
        """获取日志文件列表"""
        log_files = []

        # 遍历目录
        for root, dirs, files in os.walk(self.cluster_dir):
            # 过滤出.log文件
            for file in files:
                if file.endswith('.log'):
                    # 构建完整路径
                    full_path = os.path.join(root, file)
                    log_files.append(full_path)

        self.log_files = log_files

    def clean_log_files(self):
        """清空日志文件"""
        for file in self.log_files:
            try:
                with open(file, 'w') as f:
                    f.write('')
            except Exception as e:
                print(e)

    def schedule_task(self):
        """编排定时任务"""
        def job():
            """集群日志清理"""
            self.get_log_files()
            self.clean_log_files()

        schedule.every().day.at("03:00").do(job)

        # 作业的执行监控,到了时间就执行作业
        while True:
            schedule.run_pending()
            time.sleep(1)

def main(cluster):
    """日志定时清理任务"""
    # 获取日志清理对象
    _cleanLog = CleanLog(cluster)
    # 编排定时任务
    _cleanLog.schedule_task()



if __name__ == '__main__':
    # 脚本声明
    parser = argparse.ArgumentParser(description="定时清理脚本")
    # 添加参数
    parser.add_argument('cluster', nargs='?', help='请传入集群参数')
    # 解析参数
    args = parser.parse_args()
    # 检查参数是否传入
    if args.cluster is None:
        print("请传入集群参数!")
    else:
        main(args.cluster)

在主机上执行代码测试,由于指定的目录下存在较深的层级,检查每一层级的 .log 文件,在 19:37 时,均被清空。

Snipaste_2024-10-30_19-37-43

Snipaste_2024-10-30_19-38-30