123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- # date: 2021-10-13
- """
- # 每天0点:add_job(trigger='cron', hour='0')
- # 每1小时:add_job(trigger='interval', hour='1')
- # 每1分钟:add_job(trigger='cron', minutes='*/1')
- tips: 如果希望加强测试或重度使用,则推荐循环操作;如果轻度使用或保守推进,建议手动指定下次操作时间。
- """
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.jobstores.mongodb import MongoDBJobStore
- from apscheduler.executors.pool import ThreadPoolExecutor
- from pymongo import MongoClient
- import uuid
- class APS(object):
- def __init__(self, db_type='mongo', db_host='aibox-middleware-mongo', db_port=27017,
- username='admin', password='admin', database='aibox', collection='LoopTask', is_clean=True):
- """
- 内部访问:
- host: sri-thirdparty-mongo
- port: 27017
- 远程访问:
- host: 118.190.217.96、192.168.20.162
- port: 7030
- """
- # todo 默认使用sqlite数据库,而不是mongo
- mongo = MongoClient(db_host, db_port, username=username, password=password)
- if is_clean:
- mongo[database][collection].delete_many({})
- jobstores = {
- 'default': MongoDBJobStore(client=mongo, database=database, collection=collection),
- }
- executors = {
- 'default': ThreadPoolExecutor(max_workers=3)
- }
- self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, timezone='Asia/Shanghai')
- self.scheduler.start() # todo 不支持client方式查看job列表
- def get_all(self):
- """
- 获取任务列表
- """
- job_list = []
- jobs = self.scheduler.get_jobs()
- for job in jobs:
- data = {
- 'id': job.id,
- 'name': job.name,
- 'func': job.func.__name__,
- 'args': list(job.args),
- 'trigger': str(job.trigger),
- 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'),
- }
- job_list.append(data)
- return job_list
- def get_one(self, job_uuid):
- """获取任务详情"""
- job = self.scheduler.get_job(job_uuid)
- data = {
- 'id': job.id,
- 'name': job.name,
- 'func': job.func.__name__,
- 'args': list(job.args),
- 'trigger': str(job.trigger),
- 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'),
- }
- return data
- def create_job(self, **params):
- """
- 创建任务
- trigger: 触发器
- date 固定时间调度(只会执行一次)
- interval 固定时间间隔
- cron 定时调度
- """
- params['id'] = str(uuid.uuid4())
- # return self.scheduler.add_job(**params)
- return self.scheduler.add_job(**params, max_instances=100) # fix maximum number of running instances reached
- def remove_all(self):
- """删除全部任务"""
- jobs = self.scheduler.get_jobs()
- job_ids = [i.id for i in jobs]
- for job_id in job_ids:
- self.remove_one(job_id)
- return job_ids
- def remove_one(self, job_uuid):
- """删除指定任务"""
- return self.scheduler.remove_job(job_uuid)
- def pause_one(self, job_uuid):
- """暂停指定任务"""
- return self.scheduler.pause_job(job_uuid)
- def resume_one(self, job_uuid):
- """恢复指定任务"""
- return self.scheduler.resume_job(job_uuid)
- def pause_all(self):
- """暂停全部任务"""
- return self.scheduler.pause()
- def resume_all(self):
- """恢复全部任务"""
- return self.scheduler.resume()
- @classmethod
- def create_job_by_hour(cls, run_date, ins_id):
- """
- 定时执行
- run_date: 执行时间(%Y-%m-%d %H:%M:%S)
- ins_id: 数据id
- """
- # str_task_id = str(uuid.uuid4())
- # str_item = mdb.get_one_by_id('UserEventStrategy', ins_id)
- # str_task_id_list = str_item.get('str_task_id_list', [])
- # methods.debug_log('ApsClient.create_job_by_hour', f"str_task_id_list: {str_task_id_list}")
- # str_task_id_list.append(str_task_id)
- # mdb.update_one_by_id('UserEventStrategy', ins_id, {'str_task_id_list': str_task_id_list})
- # run_date = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M:%S")
- # cls.scheduler.add_job(func=action_event_strategy, trigger='date', run_date=run_date, id=str_task_id,
- # args=[ins_id, ])
- @classmethod
- def create_job_by_interval(cls, ins_id, start_date, end_date):
- """
- 每小时执行
- :param ins_id: 数据id
- :param start_date: 开始时间("%Y-%m-%d %H:%M:%S")
- :param end_date: 结束时间("%Y-%m-%d %H:%M:%S")
- :return:
- """
- # str_task_id = str(uuid.uuid4())
- # str_item = mdb.get_one_by_id('UserEventStrategy', ins_id)
- # str_task_id_list = str_item.get('str_task_id_list', [])
- # methods.debug_log('ApsClient.create_job_by_interval', f"str_task_id_list: {str_task_id_list}")
- # str_task_id_list.append(str_task_id)
- # mdb.update_one_by_id('UserEventStrategy', ins_id, {'str_task_id_list': str_task_id_list})
- # cls.scheduler.add_job(func=action_event_strategy, trigger='interval', id=str_task_id, hours=1,
- # start_date=start_date, end_date=end_date, args=[ins_id, ])
- @classmethod
- def create_job_by_mon(cls, hour, minute, end_date, ins_id):
- """
- 指定每周一固定时间执行
- :param hour: 指定时
- :param minute: 指定分
- :param end_date: 结束时间("%Y-%m-%d %H:%M:%S")
- :param ins_id: 数据id
- :return:
- """
- # end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")
- # cls.scheduler.add_job(func=action_event_strategy, trigger='cron', day_of_week='mon', id=str(uuid.uuid4()),
- # hour=hour, minute=minute, end_date=end_date, args=[ins_id, ])
- @classmethod
- def create_job_by_week(cls, _args, day_of_week, hour, minute, start_date, instance_id):
- """每周"""
- # cron_id = str(uuid.uuid4())
- # cls.scheduler.add_job(send_charts, 'cron', day_of_week=day_of_week, id=cron_id, hour=hour,
- # minute=minute,
- # start_date=start_date, args=_args)
- # cls.save_job_id(instance_id, cron_id)
- # cls.scheduler.shutdown()
- @classmethod
- def create_job_by_month(cls, _args, month, day, hour, minute, start_date, instance_id):
- """每月"""
- # cron_id = str(uuid.uuid4())
- #
- # cls.scheduler.add_job(send_charts, 'cron', month=month, day=day, id=cron_id, hour=hour,
- # minute=minute, start_date=start_date, args=_args, )
- # cls.save_job_id(instance_id, cron_id)
- # cls.scheduler.shutdown()
- @classmethod
- def create_job_by_day(cls, _args, hour, minute, start_date, instance_id):
- """每天"""
- # cron_id = str(uuid.uuid4())
- #
- # cls.scheduler.add_job(send_charts, 'cron', id=cron_id, hour=hour, start_date=start_date,
- # minute=minute, args=_args)
- # cls.save_job_id(instance_id, cron_id)
- # cls.scheduler.shutdown()
- @classmethod
- def create_job_by_hours(cls, _args, minute, start_date, instance_id):
- """每小时"""
- # cron_id = str(uuid.uuid4())
- # cls.scheduler.add_job(send_charts, 'cron', id=cron_id,
- # minute=minute, start_date=start_date, args=_args)
- # cls.save_job_id(instance_id, cron_id)
- # cls.scheduler.shutdown()
- @classmethod
- def create_job_by_n_day(cls, _args, day, hour, minute, start_date, instance_id):
- """n天"""
- # cron_id = str(uuid.uuid4())
- # cls.scheduler.add_job(send_charts, 'interval', id=cron_id,
- # days=day, hours=hour, minutes=minute, start_date=start_date,
- # args=_args)
- # cls.save_job_id(instance_id, cron_id)
- # cls.scheduler.shutdown()
- @classmethod
- def create_job_by_n_hour(cls, _args, hour, minute, start_date, instance_id):
- """n小时"""
- # cron_id = str(uuid.uuid4())
- # cls.scheduler.add_job(send_charts, 'interval', id=cron_id, hours=hour, minutes=minute,
- # start_date=start_date, args=_args)
- # cls.save_job_id(instance_id, cron_id)
- # cls.scheduler.shutdown()
- @classmethod
- def create_job_by_n_minute(cls, _args, minute, start_date, instance_id):
- """n分钟"""
- # cron_id = str(uuid.uuid4())
- # cls.scheduler.add_job(send_charts, 'interval', id=cron_id, minutes=minute, start_date=start_date,
- # args=_args)
- # cls.save_job_id(instance_id, cron_id)
- # cls.scheduler.shutdown()
- @classmethod
- def create_job_by_n_minute_alarm(cls, minute, start_date, end_date):
- """告警收敛"""
- # cls.scheduler.add_job(early_count, 'interval', id=str(uuid.uuid4()), minutes=minute, start_date=start_date,
- # end_date=end_date)
- # cls.scheduler.shutdown()
- if __name__ == '__main__':
- # --- init ---
- aps = APS(db_type='mongo', db_host='192.168.30.13', db_port=7030,
- username='admin', password='admin', database='vms', collection='LoopTask')
- # --- 并不支持client方式测试 ---
- pass
|