定时任务

"定时"

Posted by zwt on November 4, 2020

安装

1
pip install apscheduler

基本组件

触发器triggers:主要包含调度逻辑,每个昨天都有自己的触发器,来确定下一个任务什么时刻执行。

1
2
3
date:特定的时间点触发
interval:固定时间间隔触发
cron:在特定时间周期性的触发

任务存储器:job stores:存放任务,任务存放在内存或者数据库 执行器executors:将任务提交到线程池或者进程池中运行,任务完成,通知触发器相应事件 调度器schedulers:把上方三个组件作为参数,通过创建调度器示例来运行

使用步骤

1
2
3
新建调度器
添加调度任务
运行调度任务

使用示例

date

参数

1
2
run_date (datetime 或 str)	作业的运行日期或时间
timezone (datetime.tzinfo 或 str)	指定时区

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    print(text)

scheduler = BlockingScheduler()
# 在 2019-8-30 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
# 在 2019-8-30 01:00:00 运行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
# 在 2019-8-30 01:00:01 运行一次 job 方法
scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])

scheduler.start()

interval

参数:

1
2
3
4
5
6
7
weeks (int)	间隔几周
days (int)	间隔几天
hours (int)	间隔几小时
minutes (int)	间隔几分钟
seconds (int)	间隔多少秒
start_date (datetime 或 str)	开始日期
end_date (datetime 或 str)	结束日期

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()
# 每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])

scheduler.start()

'''
运行结果:
job2 --- 2019-08-29 22:15:00
job1 --- 2019-08-29 22:15:46
job2 --- 2019-08-29 22:16:30
job1 --- 2019-08-29 22:16:46
job1 --- 2019-08-29 22:17:46
...余下省略...
'''

cron

参数:

1
2
3
4
5
6
7
8
9
10
11
year (int 或 str)	年,4位数字
month (int 或 str)	月 (范围1-12)
day (int 或 str)	日 (范围1-31)
week (int 或 str)	周 (范围1-53)
day_of_week (int 或 str)	周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str)	时 (范围0-23)
minute (int 或 str)	分 (范围0-59)
second (int 或 str)	秒 (范围0-59)
start_date (datetime 或 str)	最早开始日期(包含)
end_date (datetime 或 str)	最晚结束时间(包含)
timezone (datetime.tzinfo 或str)	指定时区

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
from apscheduler.schedulers.blocking import BlockingScheduler

def job(text):    
    t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
    print('{} --- {}'.format(text, t))

scheduler = BlockingScheduler()
# 在每天22点,每隔 1分钟 运行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23点的25分,运行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])

scheduler.start()

'''
运行结果:
job1 --- 2019-08-29 22:25:00
job2 --- 2019-08-29 22:25:00
job1 --- 2019-08-29 22:26:00
job1 --- 2019-08-29 22:27:00
...余下省略...
'''

参考

  1. scheduler