MessageListener.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from hub import methods, Global
  2. import threading
  3. import time
  4. import json
  5. class MessageListener(object):
  6. """音柱循环检查"""
  7. @staticmethod
  8. def decorate_method(client, userdata, message):
  9. """消息处理方法"""
  10. file_name = methods.now_string('%Y-%m-%d.log')
  11. log_file_path = f"/home/server/logs/{file_name}"
  12. log_dict = json.loads(message.payload)
  13. # methods.debug_log(f"MessageListener.20", f"#log_dict: {log_dict}")
  14. log_list = list()
  15. for i in range(1, 5):
  16. v = str(log_dict.get(str(i)))
  17. log_list.append(v)
  18. methods.write_text(log_file_path, '|'.join(log_list) + '\n', 'a')
  19. methods.debug_log(f"MessageListener.24", f"#message.payload: {json.loads(message.payload)}")
  20. @classmethod
  21. def start_check_loop(cls):
  22. # --- check ---
  23. save_dir = f"/home/server/logs"
  24. if not methods.is_dir(save_dir):
  25. out = methods.run_command(f'mkdir -p {save_dir}', callback=True)
  26. methods.debug_log('MessageListener.33', f"#out: {out}")
  27. Global.emqx.start_subscribe_loop(
  28. decorate_method=MessageListener.decorate_method,
  29. subscribe_topic='bg/log'
  30. )
  31. @classmethod
  32. def run_background(cls, background_is=True):
  33. """"""
  34. p1 = threading.Thread(target=cls.start_check_loop)
  35. p1.start()
  36. if __name__ == '__main__':
  37. # --- test ---
  38. MessageListener.run_background()