from hub import methods, Global import threading import time import json class VehicleStateMessageListener(object): @staticmethod def decorate_method(client, userdata, message): """消息处理方法""" try: # --- check --- message = json.loads(message.payload) if 'direction' not in message: methods.debug_log(f"VehicleStateMessageListener.18", f"#message: {message}") return # --- define --- check_vehicle_direction = False current_vehicle_direction = 0 direction = message.get('direction') offset = 15 # 允许偏移角度(单位:度) if 0 - offset < direction < 0 + offset: check_vehicle_direction = True current_vehicle_direction = 3 elif 90 - offset < direction < 90 + offset: check_vehicle_direction = True current_vehicle_direction = 12 elif 180 - offset < direction < 180 + offset: check_vehicle_direction = True current_vehicle_direction = 9 elif 270 - offset < direction < 270 + offset: check_vehicle_direction = True current_vehicle_direction = 6 # --- update --- """ VehicleInfo: 渣包车信息表 VehicleInfo.uuid: 标识 VehicleInfo.name: 车辆名称 VehicleInfo.address: 车辆ip VehicleInfo.state: 车辆状态 1 离线(默认值) 2 在线空闲 3 人工驾驶中 4 远程驾驶中 5 自动驾驶中 VehicleInfo.check_vehicle_direction: 当前车头方向是否符合自动驾驶条件 True 符合 False 不符合(默认值) VehicleInfo.current_vehicle_direction: 当前车头方向类型 3 围栏方向 9 渣场方向 12 维修间方向 6 维修间正对方向 0 未知(默认值) VehicleInfo.check_vehicle_coordinate: 当前车辆坐标是否符合自动驾驶条件 True 符合 False 不符合(默认值) VehicleInfo.current_vehicle_coordinate_x: 当前所在位置坐标 VehicleInfo.current_vehicle_coordinate_y: 当前所在位置坐标 VehicleInfo.current_vehicle_weight: 当前车辆负载重量 """ unique_dict = {'address': message.get('address')} update_dict = { 'state': message.get('state'), 'check_vehicle_direction': check_vehicle_direction, 'current_vehicle_direction': current_vehicle_direction, 'coordinate_x': message.get('coordinate_x'), 'coordinate_y': message.get('coordinate_y'), 'current_vehicle_weight': message.get('weight'), } Global.mdb.update_one('VehicleInfo', unique_dict, update_dict) methods.debug_log(f"VehicleStateMessageListener.38", f"#message: {message}") except Exception as exception: methods.debug_log('VehicleStateMessageListener.40', f"#exception: {exception}") methods.debug_log('VehicleStateMessageListener.40', f"#traceback: {methods.trace_log()}") @classmethod def start_check_loop(cls): """启动循环""" """ topic: hs/vehicle/state message.address: 车辆ip message.state: 车辆状态 1 离线 2 在线空闲 3 人工驾驶中 4 远程驾驶中 5 自动驾驶中 message.direction: 车头方向(场地坐标偏转角度) message.coordinate_x: 当前车辆坐标 message.coordinate_y: 当前车辆坐标 message.weight: 负载重量 """ Global.mq01.start_subscribe_loop( decorate_method=VehicleStateMessageListener.decorate_method, subscribe_topic='hs/vehicle/state' ) @classmethod def run_background(cls, background_is=True): """启动现成""" p1 = threading.Thread(target=cls.start_check_loop) p1.start() if __name__ == '__main__': # --- test --- MessageListener.run_background()