import xlib as methods import threading def single_method(_func): """频率控制""" cache = {} def decorated_func(*args, **kwargs): func_name = _func.__name__ if func_name not in cache: cache[func_name] = 'NoRunning' if cache.get(func_name) == 'NoRunning': cache[func_name] = 'IsRunning' try: results = _func(*args, **kwargs) cache[func_name] = 'NoRunning' return results except Exception as e: cache[func_name] = 'NoRunning' methods.debug_log(f"@single_method", f"function '{func_name}' is {e.__class__.__name__}!") return dict(code=-1) else: methods.debug_log(f"@single_method", f"function {repr(func_name)} is not finished yet!") return dict(code=1) return decorated_func def new_thread(func): """启用线程方式执行""" def wrapper(*args, **kwargs): t = threading.Thread(target=func, args=args, kwargs=kwargs) t.start() return wrapper