123456789101112131415161718192021222324252627282930313233343536373839 |
- import xlib as methods
- import threading
- def single_method(_func):
- """频率控制"""
- cache = {}
- def decorated_func(*args, **kwargs):
- func_name = _func.__name__
- if func_name not in cache:
- cache[func_name] = 'NoRunning'
- if cache.get(func_name) == 'NoRunning':
- cache[func_name] = 'IsRunning'
- try:
- results = _func(*args, **kwargs)
- cache[func_name] = 'NoRunning'
- return results
- except Exception as e:
- cache[func_name] = 'NoRunning'
- methods.debug_log(f"@single_method", f"function '{func_name}' is {e.__class__.__name__}!")
- return dict(code=-1)
- else:
- methods.debug_log(f"@single_method", f"function {repr(func_name)} is not finished yet!")
- return dict(code=1)
- return decorated_func
- def new_thread(func):
- """启用线程方式执行"""
- def wrapper(*args, **kwargs):
- t = threading.Thread(target=func, args=args, kwargs=kwargs)
- t.start()
- return wrapper
|