xdecorator.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import xlib as methods
  2. import threading
  3. def single_method(_func):
  4. """频率控制"""
  5. cache = {}
  6. def decorated_func(*args, **kwargs):
  7. func_name = _func.__name__
  8. if func_name not in cache:
  9. cache[func_name] = 'NoRunning'
  10. if cache.get(func_name) == 'NoRunning':
  11. cache[func_name] = 'IsRunning'
  12. try:
  13. results = _func(*args, **kwargs)
  14. cache[func_name] = 'NoRunning'
  15. return results
  16. except Exception as e:
  17. cache[func_name] = 'NoRunning'
  18. methods.debug_log(f"@single_method", f"function '{func_name}' is {e.__class__.__name__}!")
  19. return dict(code=-1)
  20. else:
  21. methods.debug_log(f"@single_method", f"function {repr(func_name)} is not finished yet!")
  22. return dict(code=1)
  23. return decorated_func
  24. def new_thread(func):
  25. """启用线程方式执行"""
  26. def wrapper(*args, **kwargs):
  27. t = threading.Thread(target=func, args=args, kwargs=kwargs)
  28. t.start()
  29. return wrapper