Python编排定时任务
Python编排定时任务
scheduler
使用友好的语法定期运行 Python
函数(或任何其他可调用函数), scheduler
有以下特点:
- 一个易于使用的作业调度 API。
- 用于定期作业的进程内调度程序,无需额外的进程。
- 非常轻量且没有外部依赖。
- 出色的测试覆盖率。
- 已在 Python 3.7、3.8、3.9、3.10 和 3.11 上测试。
使用 pip
即可安装 scheduler
模块
pip install scheduler
示例代码
import time
import schedule
from datetime import datetime
# 要执行的job方法
def job():
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{current_time} 执行JOB")
# 每分钟执行一次作业
schedule.every(1).minutes.do(job)
# 每小时执行一次作业
schedule.every(1).hours.do(job)
# 每天的20:01执行一次作业
schedule.every().day.at("20:01").do(job)
# 每周日执行一次作业
schedule.every().sunday.do(job)
# 每周三的 20:00 执行一次作业
schedule.every().wednesday.at("20:00").do(job)
# 作业的执行监控,到了时间就执行作业
while True:
schedule.run_pending()
time.sleep(1)
运行如上代码,会得到如下结果。
这里是 schedule
的示例模块,作业以一定的间隔时间执行,schedule.every
是注册要执行的作业和执行间隔的描述,可以间隔几分钟、几小时甚至是特定的日期和时间运行一次。
# 每分钟执行一次作业
schedule.every(1).minutes.do(job)
schedule.run_pending()
是 schedule
库中的一个函数,用于运行所有已安排好的任务(即在定时任务的调度中,应该执行的任务),如果只是简单地调用 schedule.run_pending()
,一旦作业执行完毕,模块就会结束,需要使用 while
语句来创建一个无限循环,通过无限循环的状态,可以定期继续执行相同的过程。
while True:
schedule.run_pending()
time.sleep(1)
封装一个定时任务
如下是使用 schedule
模块后,封装清理日志文件的一段代码:
- 定义了
CleanLog
类,定义了属性、获取日志文件的方法、清理日志文件的方法、编排定时任务的方法。 - 文件声明了启动所需的参数,以及主函数。
- 主函数实例化
CleanLog
类,执行其编排定时任务的方法,创建了一个定时任务。
# -*- coding:utf-8 -*-
# !/usr/bin/env python
import os
import time
import schedule
import argparse
class CleanLog(object):
def __init__(self, cluster):
self.cluster = cluster
self.cluster_dir = os.path.join("/sidp/apps/ibdc", cluster, "logs")
def get_log_files(self):
"""获取日志文件列表"""
log_files = []
# 遍历目录
for root, dirs, files in os.walk(self.cluster_dir):
# 过滤出.log文件
for file in files:
if file.endswith('.log'):
# 构建完整路径
full_path = os.path.join(root, file)
log_files.append(full_path)
self.log_files = log_files
def clean_log_files(self):
"""清空日志文件"""
for file in self.log_files:
try:
with open(file, 'w') as f:
f.write('')
except Exception as e:
print(e)
def schedule_task(self):
"""编排定时任务"""
def job():
"""集群日志清理"""
self.get_log_files()
self.clean_log_files()
schedule.every().day.at("03:00").do(job)
# 作业的执行监控,到了时间就执行作业
while True:
schedule.run_pending()
time.sleep(1)
def main(cluster):
"""日志定时清理任务"""
# 获取日志清理对象
_cleanLog = CleanLog(cluster)
# 编排定时任务
_cleanLog.schedule_task()
if __name__ == '__main__':
# 脚本声明
parser = argparse.ArgumentParser(description="定时清理脚本")
# 添加参数
parser.add_argument('cluster', nargs='?', help='请传入集群参数')
# 解析参数
args = parser.parse_args()
# 检查参数是否传入
if args.cluster is None:
print("请传入集群参数!")
else:
main(args.cluster)
在主机上执行代码测试,由于指定的目录下存在较深的层级,检查每一层级的 .log
文件,在 19:37
时,均被清空。