MessageListener.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from hub import methods, Global
  2. import threading
  3. import time
  4. import json
  5. import os
  6. # --- for linux
  7. # save_dir = f"/home/server/logs"
  8. # --- for windows
  9. # save_dir = r'C:\SRI-DINO.Server-py\logs' # sri内网测试环境
  10. save_dir = r'C:\temp' # sri内网测试环境
  11. class MessageListener(object):
  12. """
  13. """
  14. @staticmethod
  15. def decorate_method(client, userdata, message):
  16. """消息处理方法"""
  17. # --- log ---
  18. # print(f'MessageListener24: #message.payload: {message.payload}', flush=True)
  19. # --- save log ---
  20. file_name = methods.now_string('%Y-%m-%d-%H.log')
  21. # log_file_path = f"{save_dir}/{file_name}"
  22. log_file_path = os.path.join(save_dir, file_name)
  23. log_dict = json.loads(message.payload)
  24. log_list = list()
  25. # item_count = 8 # 小车项目
  26. item_count = 29 # 湛江项目
  27. for i in range(1, item_count + 1):
  28. v = str(log_dict.get(str(i)))
  29. log_list.append(v)
  30. methods.write_text(log_file_path, '|'.join(log_list) + '\n', 'a')
  31. # methods.debug_log(f"MessageListener38", f"#message.payload: {json.loads(message.payload)}")
  32. @classmethod
  33. def start_check_loop(cls):
  34. """"""
  35. # --- check ---
  36. # if not methods.is_dir(save_dir):
  37. # out = methods.run_command(f'mkdir -p {save_dir}', callback=True)
  38. # methods.debug_log('MessageListener46', f"#out: {out}")
  39. Global.emqx.start_subscribe_loop(
  40. decorate_method=MessageListener.decorate_method,
  41. subscribe_topic='bg/log'
  42. )
  43. # @classmethod
  44. # def run_background(cls, background_is=True):
  45. # """"""
  46. # p1 = threading.Thread(target=cls.start_check_loop)
  47. # p1.start()
  48. @classmethod
  49. def run(cls, background_is=True):
  50. thread_list = [
  51. threading.Thread(target=cls.start_check_loop),
  52. ]
  53. for thread in thread_list:
  54. thread.setDaemon(True)
  55. thread.start()
  56. if background_is:
  57. return
  58. for thread in thread_list:
  59. thread.join()
  60. if __name__ == '__main__':
  61. # --- test ---
  62. MessageListener.run()