MessageListener.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from hub import methods, Global
  2. import threading
  3. import time
  4. import json
  5. class MessageListener(object):
  6. """
  7. """
  8. @staticmethod
  9. def decorate_method(client, userdata, message):
  10. """消息处理方法"""
  11. file_name = methods.now_string('%Y-%m-%d.log')
  12. log_file_path = f"/home/server/logs/{file_name}"
  13. log_dict = json.loads(message.payload)
  14. # methods.debug_log(f"MessageListener.20", f"#log_dict: {log_dict}")
  15. log_list = list()
  16. for i in range(1, 5):
  17. v = str(log_dict.get(str(i)))
  18. log_list.append(v)
  19. methods.write_text(log_file_path, '|'.join(log_list) + '\n', 'a')
  20. methods.debug_log(f"MessageListener.24", f"#message.payload: {json.loads(message.payload)}")
  21. @classmethod
  22. def start_check_loop(cls):
  23. # --- check ---
  24. save_dir = f"/home/server/logs"
  25. if not methods.is_dir(save_dir):
  26. out = methods.run_command(f'mkdir -p {save_dir}', callback=True)
  27. methods.debug_log('MessageListener.33', f"#out: {out}")
  28. Global.emqx.start_subscribe_loop(
  29. decorate_method=MessageListener.decorate_method,
  30. subscribe_topic='bg/log'
  31. )
  32. @classmethod
  33. def run_background(cls, background_is=True):
  34. """"""
  35. p1 = threading.Thread(target=cls.start_check_loop)
  36. p1.start()
  37. if __name__ == '__main__':
  38. # --- test ---
  39. MessageListener.run_background()