xapscheduler.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. # date: 2021-10-13
  2. """
  3. # 每天0点:add_job(trigger='cron', hour='0')
  4. # 每1小时:add_job(trigger='interval', hour='1')
  5. # 每1分钟:add_job(trigger='cron', minutes='*/1')
  6. tips: 如果希望加强测试或重度使用,则推荐循环操作;如果轻度使用或保守推进,建议手动指定下次操作时间。
  7. """
  8. from apscheduler.schedulers.background import BackgroundScheduler
  9. from apscheduler.jobstores.mongodb import MongoDBJobStore
  10. from apscheduler.executors.pool import ThreadPoolExecutor
  11. from pymongo import MongoClient
  12. import uuid
  13. class APS(object):
  14. def __init__(self, db_type='mongo', db_host='aibox-middleware-mongo', db_port=27017,
  15. username='admin', password='admin', database='aibox', collection='LoopTask', is_clean=True):
  16. """
  17. 内部访问:
  18. host: sri-thirdparty-mongo
  19. port: 27017
  20. 远程访问:
  21. host: 118.190.217.96、192.168.20.162
  22. port: 7030
  23. """
  24. # todo 默认使用sqlite数据库,而不是mongo
  25. mongo = MongoClient(db_host, db_port, username=username, password=password)
  26. if is_clean:
  27. mongo[database][collection].delete_many({})
  28. jobstores = {
  29. 'default': MongoDBJobStore(client=mongo, database=database, collection=collection),
  30. }
  31. executors = {
  32. 'default': ThreadPoolExecutor(max_workers=3)
  33. }
  34. self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, timezone='Asia/Shanghai')
  35. self.scheduler.start() # todo 不支持client方式查看job列表
  36. def get_all(self):
  37. """
  38. 获取任务列表
  39. """
  40. job_list = []
  41. jobs = self.scheduler.get_jobs()
  42. for job in jobs:
  43. data = {
  44. 'id': job.id,
  45. 'name': job.name,
  46. 'func': job.func.__name__,
  47. 'args': list(job.args),
  48. 'trigger': str(job.trigger),
  49. 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'),
  50. }
  51. job_list.append(data)
  52. return job_list
  53. def get_one(self, job_uuid):
  54. """获取任务详情"""
  55. job = self.scheduler.get_job(job_uuid)
  56. data = {
  57. 'id': job.id,
  58. 'name': job.name,
  59. 'func': job.func.__name__,
  60. 'args': list(job.args),
  61. 'trigger': str(job.trigger),
  62. 'next_run_time': job.next_run_time.strftime('%Y-%m-%d %H:%M:%S'),
  63. }
  64. return data
  65. def create_job(self, **params):
  66. """
  67. 创建任务
  68. trigger: 触发器
  69. date 固定时间调度(只会执行一次)
  70. interval 固定时间间隔
  71. cron 定时调度
  72. """
  73. params['id'] = str(uuid.uuid4())
  74. # return self.scheduler.add_job(**params)
  75. return self.scheduler.add_job(**params, max_instances=100) # fix maximum number of running instances reached
  76. def remove_all(self):
  77. """删除全部任务"""
  78. jobs = self.scheduler.get_jobs()
  79. job_ids = [i.id for i in jobs]
  80. for job_id in job_ids:
  81. self.remove_one(job_id)
  82. return job_ids
  83. def remove_one(self, job_uuid):
  84. """删除指定任务"""
  85. return self.scheduler.remove_job(job_uuid)
  86. def pause_one(self, job_uuid):
  87. """暂停指定任务"""
  88. return self.scheduler.pause_job(job_uuid)
  89. def resume_one(self, job_uuid):
  90. """恢复指定任务"""
  91. return self.scheduler.resume_job(job_uuid)
  92. def pause_all(self):
  93. """暂停全部任务"""
  94. return self.scheduler.pause()
  95. def resume_all(self):
  96. """恢复全部任务"""
  97. return self.scheduler.resume()
  98. @classmethod
  99. def create_job_by_hour(cls, run_date, ins_id):
  100. """
  101. 定时执行
  102. run_date: 执行时间(%Y-%m-%d %H:%M:%S)
  103. ins_id: 数据id
  104. """
  105. # str_task_id = str(uuid.uuid4())
  106. # str_item = mdb.get_one_by_id('UserEventStrategy', ins_id)
  107. # str_task_id_list = str_item.get('str_task_id_list', [])
  108. # methods.debug_log('ApsClient.create_job_by_hour', f"str_task_id_list: {str_task_id_list}")
  109. # str_task_id_list.append(str_task_id)
  110. # mdb.update_one_by_id('UserEventStrategy', ins_id, {'str_task_id_list': str_task_id_list})
  111. # run_date = datetime.datetime.strptime(run_date, "%Y-%m-%d %H:%M:%S")
  112. # cls.scheduler.add_job(func=action_event_strategy, trigger='date', run_date=run_date, id=str_task_id,
  113. # args=[ins_id, ])
  114. @classmethod
  115. def create_job_by_interval(cls, ins_id, start_date, end_date):
  116. """
  117. 每小时执行
  118. :param ins_id: 数据id
  119. :param start_date: 开始时间("%Y-%m-%d %H:%M:%S")
  120. :param end_date: 结束时间("%Y-%m-%d %H:%M:%S")
  121. :return:
  122. """
  123. # str_task_id = str(uuid.uuid4())
  124. # str_item = mdb.get_one_by_id('UserEventStrategy', ins_id)
  125. # str_task_id_list = str_item.get('str_task_id_list', [])
  126. # methods.debug_log('ApsClient.create_job_by_interval', f"str_task_id_list: {str_task_id_list}")
  127. # str_task_id_list.append(str_task_id)
  128. # mdb.update_one_by_id('UserEventStrategy', ins_id, {'str_task_id_list': str_task_id_list})
  129. # cls.scheduler.add_job(func=action_event_strategy, trigger='interval', id=str_task_id, hours=1,
  130. # start_date=start_date, end_date=end_date, args=[ins_id, ])
  131. @classmethod
  132. def create_job_by_mon(cls, hour, minute, end_date, ins_id):
  133. """
  134. 指定每周一固定时间执行
  135. :param hour: 指定时
  136. :param minute: 指定分
  137. :param end_date: 结束时间("%Y-%m-%d %H:%M:%S")
  138. :param ins_id: 数据id
  139. :return:
  140. """
  141. # end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")
  142. # cls.scheduler.add_job(func=action_event_strategy, trigger='cron', day_of_week='mon', id=str(uuid.uuid4()),
  143. # hour=hour, minute=minute, end_date=end_date, args=[ins_id, ])
  144. @classmethod
  145. def create_job_by_week(cls, _args, day_of_week, hour, minute, start_date, instance_id):
  146. """每周"""
  147. # cron_id = str(uuid.uuid4())
  148. # cls.scheduler.add_job(send_charts, 'cron', day_of_week=day_of_week, id=cron_id, hour=hour,
  149. # minute=minute,
  150. # start_date=start_date, args=_args)
  151. # cls.save_job_id(instance_id, cron_id)
  152. # cls.scheduler.shutdown()
  153. @classmethod
  154. def create_job_by_month(cls, _args, month, day, hour, minute, start_date, instance_id):
  155. """每月"""
  156. # cron_id = str(uuid.uuid4())
  157. #
  158. # cls.scheduler.add_job(send_charts, 'cron', month=month, day=day, id=cron_id, hour=hour,
  159. # minute=minute, start_date=start_date, args=_args, )
  160. # cls.save_job_id(instance_id, cron_id)
  161. # cls.scheduler.shutdown()
  162. @classmethod
  163. def create_job_by_day(cls, _args, hour, minute, start_date, instance_id):
  164. """每天"""
  165. # cron_id = str(uuid.uuid4())
  166. #
  167. # cls.scheduler.add_job(send_charts, 'cron', id=cron_id, hour=hour, start_date=start_date,
  168. # minute=minute, args=_args)
  169. # cls.save_job_id(instance_id, cron_id)
  170. # cls.scheduler.shutdown()
  171. @classmethod
  172. def create_job_by_hours(cls, _args, minute, start_date, instance_id):
  173. """每小时"""
  174. # cron_id = str(uuid.uuid4())
  175. # cls.scheduler.add_job(send_charts, 'cron', id=cron_id,
  176. # minute=minute, start_date=start_date, args=_args)
  177. # cls.save_job_id(instance_id, cron_id)
  178. # cls.scheduler.shutdown()
  179. @classmethod
  180. def create_job_by_n_day(cls, _args, day, hour, minute, start_date, instance_id):
  181. """n天"""
  182. # cron_id = str(uuid.uuid4())
  183. # cls.scheduler.add_job(send_charts, 'interval', id=cron_id,
  184. # days=day, hours=hour, minutes=minute, start_date=start_date,
  185. # args=_args)
  186. # cls.save_job_id(instance_id, cron_id)
  187. # cls.scheduler.shutdown()
  188. @classmethod
  189. def create_job_by_n_hour(cls, _args, hour, minute, start_date, instance_id):
  190. """n小时"""
  191. # cron_id = str(uuid.uuid4())
  192. # cls.scheduler.add_job(send_charts, 'interval', id=cron_id, hours=hour, minutes=minute,
  193. # start_date=start_date, args=_args)
  194. # cls.save_job_id(instance_id, cron_id)
  195. # cls.scheduler.shutdown()
  196. @classmethod
  197. def create_job_by_n_minute(cls, _args, minute, start_date, instance_id):
  198. """n分钟"""
  199. # cron_id = str(uuid.uuid4())
  200. # cls.scheduler.add_job(send_charts, 'interval', id=cron_id, minutes=minute, start_date=start_date,
  201. # args=_args)
  202. # cls.save_job_id(instance_id, cron_id)
  203. # cls.scheduler.shutdown()
  204. @classmethod
  205. def create_job_by_n_minute_alarm(cls, minute, start_date, end_date):
  206. """告警收敛"""
  207. # cls.scheduler.add_job(early_count, 'interval', id=str(uuid.uuid4()), minutes=minute, start_date=start_date,
  208. # end_date=end_date)
  209. # cls.scheduler.shutdown()
  210. if __name__ == '__main__':
  211. # --- init ---
  212. aps = APS(db_type='mongo', db_host='192.168.30.13', db_port=7030,
  213. username='admin', password='admin', database='vms', collection='LoopTask')
  214. # --- 并不支持client方式测试 ---
  215. pass