Skip to content

Python-crontab 定时任务

736字约2分钟

Python

2023-06-29

python-crontab 第三方包实现主机上的定时任务。

1.安装

# 下载python-crontab的安装包
# 解压后,使用python直接进行安装
build/env/bin/python2.7 setup.py install

https://files.pythonhosted.org/packages/6b/ff/42c11cf843eedaff89c6ac5e65ab9935cb4af7bd13621708036ea8352d38/python-crontab-2.5.0.tar.gz

2.使用

# -*- coding=utf-8 -*-
import logging
from crontab import CronTab

LOG = logging.getLogger(__name__)

class CrontabTask(object):
    """
    定时任务方法类
    """

    def __init__(self):
        self.cron = CronTab(user=True)

    def addTask(self, cmd, task_name, time_dic):
        """
        新增定时任务
        @param cmd:
        @param task_name:
        @param time_dic:
        @return:
        """
        result = None
        try:
            task = self.cron.new(command=cmd, comment=task_name)
            if 'minute' in time_dic:
                task.minute.every(time_dic['minute'])
            if 'hour' in time_dic:
                task.hour.every(time_dic['hour'])
            self.cron.write()
        except Exception as e:
            LOG.exception(e)
            result = e
        return result

    def editTask(self, cmd, task_name, time_dic):
        """
        编辑定时任务
        @param cmd:
        @param task_name:
        @param time_dic:
        @return:
        """
        result = None
        try:
            tasks = self.cron.find_comment(task_name)
            for task in tasks:
                task.set_command(cmd)

                if 'minute' in time_dic:
                    task.minute.every(time_dic['minute'])
                if 'hour' in time_dic:
                    task.hour.every(time_dic['hour'])

            self.cron.write()
        except Exception as e:
            LOG.exception(e)
            result = e
        return result

    def deleteTask(self, task_name):
        """
        删除定时任务
        @param task_name:
        @return:
        """
        result = None
        try:
            tasks = self.cron.find_comment(task_name)
            for job in tasks:
                self.cron.remove(job)
            self.cron.write()
        except Exception as e:
            LOG.exception(e)
            result = e
        return result

    def getTasks(self):
        """
        获取定时任务的信息
        @return:
        """
        result = []
        try:
            tasks = str(self.cron).split('\n')
            for task in tasks:
                if task != '':
                    result.append(task.replace("\n",""))
        except Exception as e:
            LOG.exception(e)
            result = e
        return result

3.测试

3.1 获取定时任务

执行Python代码查看当前用户的定时任务。

获取定时任务列表

直接通过crontab命令查看当前用户的定时任务,两者内容一致。

Snipaste_2023-06-29_16-05-38

3.2 新增定时任务

执行Python代码新增定时任务。

新增定时任务

直接通过crontab命令查看当前用户的定时任务,出现使用Python代码新增的定时任务。

Snipaste_2023-06-29_16-08-23

3.3 修改定时任务

执行Python代码修改定时任务。

修改定时任务

直接通过crontab命令查看当前用户的定时任务,定时任务修改成功。

Snipaste_2023-06-29_16-16-20

3.4 删除定时任务

执行Python代码删除定时任务。

定时任务删除

直接通过crontab命令查看当前用户的定时任务,定时任务删除成功。

定时任务删除成功