from hub import Global, methods class JobManage(object): """""" # --- 接口认证信息 --- # username = '500A7062' # password = '198797#cjhxbin' @classmethod def run(cls): """启动全部任务""" # --- test --- # Global.aps.create_job(func=cls.job101, trigger='interval', seconds=3) # 循环测试 # Global.aps.create_job(func=cls.job101, trigger='date', run_date='2022-02-23 18:54:00') # 定时测试 # Global.aps.create_job(func=cls.job301, trigger='interval', seconds=60) # 循环测试 # Global.aps.create_job(func=cls.job20102, trigger='date', run_date='2022-07-28 17:15:30') # 定时测试 # --- release --- Global.aps.create_job(func=cls.job101, trigger='cron', hour=22) # 每天晚10点 release # Global.aps.create_job(func=cls.job20102, trigger='interval', seconds=600) # 每10分钟 release # Global.aps.create_job(func=cls.job301, trigger='interval', seconds=300) # 每5分钟 release @classmethod def end(cls): """暂停全部任务""" Global.aps.pause_all() @staticmethod def job101(): """ 每日22点清理30天之前的日志 """ # --- get list --- log_file_dir = f"/home/server/logs" file_path_list = methods.get_file_path_list(log_file_dir) file_name_list = [i.split('/')[-1] for i in file_path_list] # methods.debug_log('JobManage.job101.41', f"#file_name_list: {file_name_list}") # --- cut list --- for file_name in file_name_list[30:]: file_path = f"/home/server/logs/{file_name}" if methods.is_file(file_path): methods.remove_file(file_path)