123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- """
- # 每天0点:add_job(trigger='cron', hour='0')
- # 每1小时:add_job(trigger='interval', hour='1')
- # 每1分钟:add_job(trigger='cron', minutes='*/1')
- tips: 如果希望加强测试或重度使用,则推荐循环操作;如果轻度使用或保守推进,建议手动指定下次操作时间。
- """
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.jobstores.mongodb import MongoDBJobStore
- from apscheduler.executors.pool import ThreadPoolExecutor
- from pymongo import MongoClient
- import uuid
- class APS(object):
- def __init__(self, db_type='mongo', db_host='aibox-middleware-mongo', db_port=27017,
- username='admin', password='admin', database='aibox', collection='LoopTask', is_clean=True):
- """
- 内部访问:
- host: sri-thirdparty-mongo
- port: 27017
- 远程访问:
- host: 118.190.217.96、192.168.20.162
- port: 7030
- """
-
- mongo = MongoClient(db_host, db_port, username=username, password=password)
- if is_clean:
- mongo[database][collection].delete_many({})
- jobstores = {
- 'default': MongoDBJobStore(client=mongo, database=database, collection=collection),
- }
- executors = {
- 'default': ThreadPoolExecutor(max_workers=3)
- }
- self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, timezone='Asia/Shanghai')
- self.scheduler.start()
- def get_all(self):
- """
- 获取任务列表
- """
- job_list = []
- jobs = self.scheduler.get_jobs()
- for job in jobs:
- data = {
- 'id': job.id,
- 'name': job.name,
- 'func': job.func.__name__,
- 'args': list(job.args),
- 'trigger': str(job.trigger),
- 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'),
- }
- job_list.append(data)
- return job_list
- def get_one(self, job_uuid):
- """获取任务详情"""
- job = self.scheduler.get_job(job_uuid)
- data = {
- 'id': job.id,
- 'name': job.name,
- 'func': job.func.__name__,
- 'args': list(job.args),
- 'trigger': str(job.trigger),
- 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'),
- }
- return data
- def create_job(self, **params):
- """
- 创建任务
- trigger: 触发器
- date 固定时间调度(只会执行一次)
- interval 固定时间间隔
- cron 定时调度
- """
- params['id'] = str(uuid.uuid4())
-
- return self.scheduler.add_job(**params, max_instances=100)
- def remove_all(self):
- """删除全部任务"""
- jobs = self.scheduler.get_jobs()
- job_ids = [i.id for i in jobs]
- for job_id in job_ids:
- self.remove_one(job_id)
- return job_ids
- def remove_one(self, job_uuid):
- """删除指定任务"""
- return self.scheduler.remove_job(job_uuid)
- def pause_one(self, job_uuid):
- """暂停指定任务"""
- return self.scheduler.pause_job(job_uuid)
- def resume_one(self, job_uuid):
- """恢复指定任务"""
- return self.scheduler.resume_job(job_uuid)
- def pause_all(self):
- """暂停全部任务"""
- return self.scheduler.pause()
- def resume_all(self):
- """恢复全部任务"""
- return self.scheduler.resume()
- @classmethod
- def create_job_by_hour(cls, run_date, ins_id):
- """
- 定时执行
- run_date: 执行时间(%Y-%m-%d %H:%M:%S)
- ins_id: 数据id
- """
-
-
-
-
-
-
-
-
-
- @classmethod
- def create_job_by_interval(cls, ins_id, start_date, end_date):
- """
- 每小时执行
- :param ins_id: 数据id
- :param start_date: 开始时间("%Y-%m-%d %H:%M:%S")
- :param end_date: 结束时间("%Y-%m-%d %H:%M:%S")
- :return:
- """
-
-
-
-
-
-
-
-
- @classmethod
- def create_job_by_mon(cls, hour, minute, end_date, ins_id):
- """
- 指定每周一固定时间执行
- :param hour: 指定时
- :param minute: 指定分
- :param end_date: 结束时间("%Y-%m-%d %H:%M:%S")
- :param ins_id: 数据id
- :return:
- """
-
-
-
- @classmethod
- def create_job_by_week(cls, _args, day_of_week, hour, minute, start_date, instance_id):
- """每周"""
-
-
-
-
-
-
- @classmethod
- def create_job_by_month(cls, _args, month, day, hour, minute, start_date, instance_id):
- """每月"""
-
-
-
-
-
-
- @classmethod
- def create_job_by_day(cls, _args, hour, minute, start_date, instance_id):
- """每天"""
-
-
-
-
-
-
- @classmethod
- def create_job_by_hours(cls, _args, minute, start_date, instance_id):
- """每小时"""
-
-
-
-
-
- @classmethod
- def create_job_by_n_day(cls, _args, day, hour, minute, start_date, instance_id):
- """n天"""
-
-
-
-
-
-
- @classmethod
- def create_job_by_n_hour(cls, _args, hour, minute, start_date, instance_id):
- """n小时"""
-
-
-
-
-
- @classmethod
- def create_job_by_n_minute(cls, _args, minute, start_date, instance_id):
- """n分钟"""
-
-
-
-
-
- @classmethod
- def create_job_by_n_minute_alarm(cls, minute, start_date, end_date):
- """告警收敛"""
-
-
-
- if __name__ == '__main__':
-
- aps = APS(db_type='mongo', db_host='192.168.30.13', db_port=7030,
- username='admin', password='admin', database='vms', collection='LoopTask')
-
- pass
|