# date: 2021-10-13 """ # 每天0点:add_job(trigger='cron', hour='0') # 每1小时:add_job(trigger='interval', hour='1') # 每1分钟:add_job(trigger='cron', minutes='*/1') tips: 如果希望加强测试或重度使用,则推荐循环操作;如果轻度使用或保守推进,建议手动指定下次操作时间。 """ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.executors.pool import ThreadPoolExecutor from pymongo import MongoClient import uuid class APS(object): def __init__(self, db_type='mongo', db_host='aibox-middleware-mongo', db_port=27017, username='admin', password='admin', database='aibox', collection='LoopTask', is_clean=True): """ 内部访问: host: sri-thirdparty-mongo port: 27017 远程访问: host: 118.190.217.96、192.168.20.162 port: 7030 """ # todo 默认使用sqlite数据库,而不是mongo mongo = MongoClient(db_host, db_port, username=username, password=password) if is_clean: mongo[database][collection].delete_many({}) jobstores = { 'default': MongoDBJobStore(client=mongo, database=database, collection=collection), } executors = { 'default': ThreadPoolExecutor(max_workers=3) } self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, timezone='Asia/Shanghai') self.scheduler.start() # todo 不支持client方式查看job列表 def get_all(self): """ 获取任务列表 """ job_list = [] jobs = self.scheduler.get_jobs() for job in jobs: data = { 'id': job.id, 'name': job.name, 'func': job.func.__name__, 'args': list(job.args), 'trigger': str(job.trigger), 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'), } job_list.append(data) return job_list def get_one(self, job_uuid): """获取任务详情""" job = self.scheduler.get_job(job_uuid) data = { 'id': job.id, 'name': job.name, 'func': job.func.__name__, 'args': list(job.args), 'trigger': str(job.trigger), 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'), } return data def create_job(self, **params): """ 创建任务 trigger: 触发器 date 固定时间调度(只会执行一次) interval 固定时间间隔 cron 定时调度 """ params['id'] = str(uuid.uuid4()) # return self.scheduler.add_job(**params) return self.scheduler.add_job(**params, max_instances=100) # fix maximum number of running instances reached def remove_all(self): """删除全部任务""" jobs = self.scheduler.get_jobs() job_ids = [i.id for i in jobs] for job_id in job_ids: self.remove_one(job_id) return job_ids def remove_one(self, job_uuid): """删除指定任务""" return self.scheduler.remove_job(job_uuid) def pause_one(self, job_uuid): """暂停指定任务""" return self.scheduler.pause_job(job_uuid) def resume_one(self, job_uuid): """恢复指定任务""" return self.scheduler.resume_job(job_uuid) def pause_all(self): """暂停全部任务""" return self.scheduler.pause() def resume_all(self): """恢复全部任务""" return self.scheduler.resume() @classmethod def create_job_by_hour(cls, run_date, ins_id): """ 定时执行 run_date: 执行时间(%Y-%m-%d %H:%M:%S) ins_id: 数据id """ # str_task_id = str(uuid.uuid4()) # str_item = mdb.get_one_by_id('UserEventStrategy', ins_id) # str_task_id_list = str_item.get('str_task_id_list', []) # methods.debug_log('ApsClient.create_job_by_hour', f"str_task_id_list: {str_task_id_list}") # str_task_id_list.append(str_task_id) # mdb.update_one_by_id('UserEventStrategy', ins_id, {'str_task_id_list': str_task_id_list}) # run_date = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M:%S") # cls.scheduler.add_job(func=action_event_strategy, trigger='date', run_date=run_date, id=str_task_id, # args=[ins_id, ]) @classmethod def create_job_by_interval(cls, ins_id, start_date, end_date): """ 每小时执行 :param ins_id: 数据id :param start_date: 开始时间("%Y-%m-%d %H:%M:%S") :param end_date: 结束时间("%Y-%m-%d %H:%M:%S") :return: """ # str_task_id = str(uuid.uuid4()) # str_item = mdb.get_one_by_id('UserEventStrategy', ins_id) # str_task_id_list = str_item.get('str_task_id_list', []) # methods.debug_log('ApsClient.create_job_by_interval', f"str_task_id_list: {str_task_id_list}") # str_task_id_list.append(str_task_id) # mdb.update_one_by_id('UserEventStrategy', ins_id, {'str_task_id_list': str_task_id_list}) # cls.scheduler.add_job(func=action_event_strategy, trigger='interval', id=str_task_id, hours=1, # start_date=start_date, end_date=end_date, args=[ins_id, ]) @classmethod def create_job_by_mon(cls, hour, minute, end_date, ins_id): """ 指定每周一固定时间执行 :param hour: 指定时 :param minute: 指定分 :param end_date: 结束时间("%Y-%m-%d %H:%M:%S") :param ins_id: 数据id :return: """ # end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S") # cls.scheduler.add_job(func=action_event_strategy, trigger='cron', day_of_week='mon', id=str(uuid.uuid4()), # hour=hour, minute=minute, end_date=end_date, args=[ins_id, ]) @classmethod def create_job_by_week(cls, _args, day_of_week, hour, minute, start_date, instance_id): """每周""" # cron_id = str(uuid.uuid4()) # cls.scheduler.add_job(send_charts, 'cron', day_of_week=day_of_week, id=cron_id, hour=hour, # minute=minute, # start_date=start_date, args=_args) # cls.save_job_id(instance_id, cron_id) # cls.scheduler.shutdown() @classmethod def create_job_by_month(cls, _args, month, day, hour, minute, start_date, instance_id): """每月""" # cron_id = str(uuid.uuid4()) # # cls.scheduler.add_job(send_charts, 'cron', month=month, day=day, id=cron_id, hour=hour, # minute=minute, start_date=start_date, args=_args, ) # cls.save_job_id(instance_id, cron_id) # cls.scheduler.shutdown() @classmethod def create_job_by_day(cls, _args, hour, minute, start_date, instance_id): """每天""" # cron_id = str(uuid.uuid4()) # # cls.scheduler.add_job(send_charts, 'cron', id=cron_id, hour=hour, start_date=start_date, # minute=minute, args=_args) # cls.save_job_id(instance_id, cron_id) # cls.scheduler.shutdown() @classmethod def create_job_by_hours(cls, _args, minute, start_date, instance_id): """每小时""" # cron_id = str(uuid.uuid4()) # cls.scheduler.add_job(send_charts, 'cron', id=cron_id, # minute=minute, start_date=start_date, args=_args) # cls.save_job_id(instance_id, cron_id) # cls.scheduler.shutdown() @classmethod def create_job_by_n_day(cls, _args, day, hour, minute, start_date, instance_id): """n天""" # cron_id = str(uuid.uuid4()) # cls.scheduler.add_job(send_charts, 'interval', id=cron_id, # days=day, hours=hour, minutes=minute, start_date=start_date, # args=_args) # cls.save_job_id(instance_id, cron_id) # cls.scheduler.shutdown() @classmethod def create_job_by_n_hour(cls, _args, hour, minute, start_date, instance_id): """n小时""" # cron_id = str(uuid.uuid4()) # cls.scheduler.add_job(send_charts, 'interval', id=cron_id, hours=hour, minutes=minute, # start_date=start_date, args=_args) # cls.save_job_id(instance_id, cron_id) # cls.scheduler.shutdown() @classmethod def create_job_by_n_minute(cls, _args, minute, start_date, instance_id): """n分钟""" # cron_id = str(uuid.uuid4()) # cls.scheduler.add_job(send_charts, 'interval', id=cron_id, minutes=minute, start_date=start_date, # args=_args) # cls.save_job_id(instance_id, cron_id) # cls.scheduler.shutdown() @classmethod def create_job_by_n_minute_alarm(cls, minute, start_date, end_date): """告警收敛""" # cls.scheduler.add_job(early_count, 'interval', id=str(uuid.uuid4()), minutes=minute, start_date=start_date, # end_date=end_date) # cls.scheduler.shutdown() if __name__ == '__main__': # --- init --- aps = APS(db_type='mongo', db_host='192.168.30.13', db_port=7030, username='admin', password='admin', database='vms', collection='LoopTask') # --- 并不支持client方式测试 --- pass