Skip to main content

Python APScheduler

Table of Contents

APScheduler 是一个 Python 定时任务框架, 支持 Cron、Interval、Date、Timeout 等类型的任务,
支持分布式任务, 支持任务失败重试, 支持任务并发限制, 支持任务状态监控, 支持任务日志记录

安装与介绍

 $ pip install apscheduler
$ python -c "import apscheduler" && echo "Installed"
> Installed

apscheduler 有四个基本对象

scheduler: 调度器, 用于调度任务
job: 任务, 定义了任务执行的内容
trigger: 触发器, 用于定义任务执行的规则
executor: 执行器, 用于执行任务

基本使用

四种调度器:
BlockingScheduler: 阻塞调度器, 适用于单线程的应用
BackgroundScheduler: 后台调度, 不影响主线程
AsyncIOScheduler: 异步IO调度器, 适用于多线程的应用
GeventScheduler: gevent 调度器, 适用于多线程的应用

三种触发器:
cron: 基于 Cron 表达式的触发(周期性)
interval: 固定间隔触发
date: 基于日期, 特定时间只触发一次

from apscheduler.schedulers.background import BackgroundScheduler

def func(name='John'):
print(f'Hello, world!, {name}')


schedule = BackgroundScheduler() # 选择一种调度器
schedule.add_job(func, 'interval', seconds=5) # 每 5s 执行一次
scheduler.add_job(func, 'cron', minute='*/5') # 每 5 分钟执行一次


date = '2024-01-04 12:00:00' # 固定时间执行一次
schedule.add_job(func, 'date', run_date=date, args=['lily'])
add_job 参数含义
func任务函数
trigger触发器
args任务函数的参数
kwargs任务函数的参数
id任务的唯一标识符
name任务的名称
misfire_grace_time任务失败重试时间
coalesce是否允许任务重复执行
from apscheduler.schedulers.blocking import BlockingScheduler

def job_func():
print('Hello, world!')

scheduler = BlockingScheduler()
scheduler.add_job(job_func, trigger="cron", second='*/5')

cron 触发器

croon 触发器很强大, 支持多种表达式以表示周期或者间隔

参数secondminutehourdayday_of_weekweekmonth
时间点秒(0-59)分钟(0-59)小时(0-23)天(1-31)每周(0-6)周(1-53)月(1-12)

设置固定时间点触发

from datetime import datetime

timestamp = datetime.strptime("12:30:00", "%H:%M:%S")
schedule.add_job(
function,
'cron',
hours=timestamp.hour,
minute=timestamp.minute
secondes=timestamp.second
)

设置时间间隔触发

start1-end1/step1, start2-end2/step2
start 开始到 end 结束, 每隔 step 触发一次, 使用 , 分隔多个表达式

schedule.add_job(func, 'cron', minute='*/5')     # 任意时间, 每隔 5 分钟触发一次
schedule.add_job(func, 'cron', hour='1-6/2') # 1 点至 6 点, 每隔 2 小时触发一次
schedule.add_job(func, 'cron', day='1, 4, 6') # 每个月 1, 4, 6 号触发一次

interval 触发器

interval 设置时间间隔触发

参数secondsminuteshoursdaysweeksmonths
间隔分钟小时
from datetime import datetime

timestamp = datetime.strptime("00:00:10", "%H:%M:%S")
schedule.add_job(
function,
'interval',
hours=timestamp.hour,
minute=timestamp.minute
secondes=timestamp.second
)

schedule.add_job(func, 'interval', seconds=5)
schedule.add_job(func, 'interval', minute=5)
schedule.add_job(func, 'interval', hour=5)
schedule.add_job(func, 'interval', day=5)

注: 时间间隔与时间点有对应关系, 所以用时间点字符串转化为时间间隔

date 触发器

date 设置一个时间点只执行一次

参数secondminutehourdayweekmonth
时间点秒(0-59)分钟(0-59)小时(0-23)天(1-31)周(1-53)月(1-12)
schedule.add_job(func, 'date', run_date='2024-01-08 10:30:00')

任务存储

创建调度器时可以添加数据库存储任务

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from pymongo import MongoClient


mongo = MongoDBJobStore( # 设置 monogo 存储任务
database='db',
collection='collection',
client=MongoClient("mongodb://localhost:27017")
)

schedule = BackgroundScheduler( # 使用后台调度
jobstores={'default': mongo}, # 使用 mongo 作为默认存储
)

scheduler.start()
字段_idnext_run_timejob_state
含义job.idjob.next_run_timejob状态

任务调度


from apscheduler.schedulers.background import BackgroundScheduler

def func():
print(f'Hello, world!')

schedule = BackgroundScheduler()
job = scheduler.add_job(func, 'interval', seconds=2, id="test")

job.pause('test')
job.resume('test')
job.remove('test')

scheduler.pause_job('test')
scheduler.resume_job('test')
scheduler.remove_job('test')

jobs = scheduler.get_jobs()

job 属性 id name func args next_run_time

示例

使用内存作存储定时任务, 服务关闭任务丢失(可以设定数据库存储)

from time import sleep
from pytz import timezone
from datetime import datetime, timedelta

from loguru import logger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger
from apscheduler.job import Job


class Schedule:

scheduler: BackgroundScheduler | None = None
jobstores = {
# 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}

@classmethod
def init(cls):
cls.scheduler = BackgroundScheduler(
jobstores=cls.jobstores,
executors=cls.executors,
job_defaults=cls.job_defaults,
timezone=timezone("Asia/Shanghai")
)
cls.scheduler.start()

@classmethod
def get_jobs(cls):
return cls.scheduler.get_jobs()

@classmethod
def add_once_job(cls, id: str, func: callable, timestamp: datetime):
job = cls.scheduler.add_job(func, 'date', run_date=timestamp, id=id)

@classmethod
def add_cron_job(cls, id: str, func: callable, cron: str):
job = cls.scheduler.add_job(func, trigger=CronTrigger.from_crontab(cron), id=id)

@classmethod
def remove_job(cls, id: str):
job = cls.scheduler.remove_job(id)

@classmethod
def parse_job(cls, job: Job):
return {"id": job.id, "job": job.func.__name__, "next": job.next_run_time}

# 自定义函数, 使用 add_once_job 添加任务
once_task = lambda: print(f"task run at {datetime.now()}")
once_task.__name__ = f"run once_task at {datetime.now()}"
Schedule.add_once_job(id="once_task", func=once_task, timestamp=datetime.now() + timedelta(hours=3))

实现一个定时任务调度器, mongo 存储任务

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore


mongo = MongoClient("mongodb://localhost:27017")
db = mongo['database']
Task_Info = db['tasks']

class Schedules:
""" 定时任务管理 """

scheduler = None
jobstores = {
'default': MongoDBJobStore(
database='database',
collection='tasks',
client=mongo,
)
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}

@classmethod
def init(cls):
""" 任务管理初始化 """
cls.scheduler = BackgroundScheduler(
jobstores=cls.jobstores,
executors=cls.executors,
job_defaults=cls.job_defaults,
)
cls.scheduler.start()

@classmethod
def add_cron_job(cls, id, func, cron_str, params=None):
""" 执行每天周期性任务 """
cron = datetime.strptime(cron_str, "%H:%M:%S")
params = None if params is None else [*params]
job = cls.scheduler.add_job(
func,
'cron',
id=id,
args=params,
hour=cron.hour,
minute=cron.minute,
second=cron.second,
coalesce=True,
replace_existing=True
)

if Task_Info.find_one({'id': id}) is not None:
return f"task {id} already exist"

Task_Info.insert_one({
'id': job.id,
'function': f"{func.__name__}{job.args}",
'type': 'cron',
'time': cron_str,
'enable': True,
'description': f"{func.__name__}{job.args} with {cron_str}",
'start': datetime.now().isoformat(),
})

task = Task_Info.find_one({'id': job.id})
task.pop('_id')
return task

@classmethod
def add_interval_job(cls, id, func, interval_str, params=None):
""" 执行循环任务 """
interval = datetime.strptime(interval_str, "%H:%M:%S")
params = None if params is None else [*params]
job = cls.scheduler.add_job(
func,
'interval',
id=id,
args=params,
hours=interval.hour,
minutes=interval.minute,
seconds=interval.second,
coalesce=True,
replace_existing=True
)

if Task_Info.find_one({'id': id}) is not None:
return f"task {id} already exist"

Task_Info.insert_one({
'id': id,
'function': f"{func.__name__}{job.args}",
'type': 'interval',
'time': interval_str,
'enable': True,
'description': f"{func.__name__}{job.args} with {interval_str}",
'start': datetime.now().isoformat(),
})

if Task_Info.find_one({'id': id}) is None:
return "task already exist"

task = Task_Info.find_one({'id': job.id})
task.pop('_id')
return task

@classmethod
def get_job(cls, job_id):
""" 获取任务信息 """
return str(cls.scheduler.get_job(job_id))

@classmethod
def list_jobs(cls):
""" 列出所有任务 """
status = []
for job in cls.scheduler.get_jobs():
task = Task_Info.find_one({'id': job.id})
task.pop('_id')
status.append({
**task,
'next': job.next_run_time,
})
return status

@classmethod
def delete_job(cls, job_id):
""" 删除定时任务 """
cls.scheduler.remove_job(job_id)
Task_Info.delete_one({'name': job_id})

@classmethod
def pause_job(cls, job_id):
""" 暂停定时任务 """
cls.scheduler.pause_job(job_id)
Task_Info.update_one({'name': job_id}, {'$set': {'enable': False}})


@classmethod
def resume_job(cls, job_id):
""" 恢复定时任务 """
cls.scheduler.resume_job(job_id)
Task_Info.update_one({'name': job_id}, {'$set': {'enable': True}})