2021-07-29 19:36:19
在Python中,使用APScheduler实现定时任务的核心步骤包括安装、选择调度器、配置调度器、添加任务、启动调度器,并可通过Cron表达式定义复杂时间规则,同时支持任务持久化与并发控制。 以下是详细配置指南:
1. 安装APScheduler使用pip安装APScheduler库:
pip install apscheduler2. 选择调度器类型APScheduler提供多种调度器,根据应用场景选择:
创建调度器实例时,可配置以下参数:
MemoryJobStore(默认):任务存储在内存,重启后丢失。
SQLAlchemyJobStore:任务持久化到数据库(需安装SQLAlchemy)。
ThreadPoolExecutor:线程池,适合I/O密集型任务。
ProcessPoolExecutor:进程池,适合CPU密集型任务。
coalesce:合并错过的任务(如服务器重启后只执行一次)。
max_instances:限制任务最大并发实例数(避免资源冲突)。
示例配置:
from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutorjobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.db') # 使用SQLite持久化}executors = { 'default': ThreadPoolExecutor(20), # 线程池,最大20个线程 'processpool': ProcessPoolExecutor(5) # 进程池,最大5个进程}job_defaults = { 'coalesce': False, # 不合并错过的任务 'max_instances': 3 # 同一任务最多3个并发实例}scheduler = BackgroundScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')4. 添加任务使用add_job()方法添加任务,需指定触发器类型和参数:
date:单次执行(如run_date='2024-01-01 00:00:00')。
interval:固定间隔执行(如seconds=10)。
cron:Cron表达式定义复杂规则。
func:要执行的函数。
args/kwargs:函数参数。
id:任务唯一ID(用于替换或删除)。
replace_existing:是否替换同名任务。
Cron表达式格式:
秒 分 时 日 月 周 年(可选)示例:
添加任务示例:
def print_time(text): from datetime import datetime print(f"{text}: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")# 间隔触发(每10秒)scheduler.add_job(print_time, 'interval', seconds=10, args=['Interval Job'])# Cron触发(每周一至周五9:30)scheduler.add_job( print_time, 'cron', hour=9, minute=30, day_of_week='mon-fri', args=['Cron Job'])5. 启动调度器调用start()方法启动调度器,并保持程序运行:
scheduler.start()try: while True: time.sleep(1)except (KeyboardInterrupt, SystemExit): scheduler.shutdown() # 优雅关闭6. 任务持久化使用SQLAlchemyJobStore将任务存储到数据库(如SQLite、MySQL、PostgreSQL),避免重启后任务丢失。
步骤:
I/O密集型任务:ThreadPoolExecutor。
CPU密集型任务:ProcessPoolExecutor(利用多核)。
并发控制示例:
def long_task(task_id): import time print(f"Task {task_id} started") time.sleep(5) # 模拟耗时操作 print(f"Task {task_id} finished")# 限制同一任务最多1个实例scheduler.add_job( long_task, 'interval', seconds=2, args=['A'], coalesce=True, max_instances=1)完整示例from apscheduler.schedulers.background import BackgroundSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoreimport timedef job_function(name): print(f"Job {name} executed at {time.strftime('%X')}")# 配置调度器jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')}scheduler = BackgroundScheduler(jobstores=jobstores, timezone='Asia/Shanghai')# 添加任务scheduler.add_job( job_function, 'cron', args=['Cron'], hour=10, minute=30, day_of_week='mon-fri')scheduler.add_job( job_function, 'interval', args=['Interval'], seconds=10, coalesce=True, max_instances=1)# 启动调度器scheduler.start()try: while True: time.sleep(1)except (KeyboardInterrupt, SystemExit): scheduler.shutdown()通过以上配置,APScheduler可灵活实现定时任务,支持复杂时间规则、任务持久化与并发控制,适用于Web应用、数据处理、自动化运维等场景。