from hub import methods, Global import threading import traceback import time import json import os from lib.UserWorkRecord import UserWorkRecord class TopicListener: """ """ # last_log_key = str() def __init__(self, subscribe_topic, method_name): self.subscribe_topic = subscribe_topic self.method_name = method_name def subscribe(self): last_log_key = str() def method_c001(_client, _userdata, message): """消息处理方法""" """ { 'basControl': { 'directSwitch': 1, 'eStop': 0, 'gearControl': 0, 'hazardLight': 0, 'keyStatus': 0, 'parkControl': 0, 'silencedAlarm': 0, 'travelLight': 2, 'travelMode': 0, 'vehicleHorn': 0 }, 'driverControl': { 'accPedal': '0 AND 0 || 0 AND -1', 'brakePedal': '2 AND 2', 'steeringWheel': '1 AND 0', 'turnMode': 0, 'turnSignal': 0 }, 'errCode': { 'errAccPedal': 0, 'errBasOperation': '', 'errBrakePedal': 0, 'errEndTool': 0, 'errHandle': 0, 'errOther': '', 'errSteeringWheel': 0 }, 'taskControl': { 'baseLegControl': '103 AND 24', 'baseLegSwitch': 0, 'bypassSwitch': 0, 'cabLift': 0, 'coopSignal': 3, 'enableHydraulic': 1, 'endJoint': '1 AND 0', 'esCabLift': 0, 'suckerSelect': 0, 'taskJoint_1': '5 AND 0', 'taskJoint_2': '0 AND 0', 'taskJoint_3': '0 AND 0', 'toolControl': '0 AND 0', 'workLight': 0 }, 'timeStamp': 576301601, 'userID': 'Ego', 'cockpitID': 'Cop001', 'vehicleID': '0538' } """ # --- check --- # methods.debug_log(f"method_c001|75", f"#message.payload: {message.payload}") try: log_dict = json.loads(message.payload) except Exception as exception: methods.debug_log("method_c001|79", f"#message.payload: {message.payload}") methods.debug_log("method_c001|79", f"#exception: {exception.__class__.__name__}") methods.debug_log("method_c001|79", f"#traceback: {traceback.format_exc()}") return # --- check --- # log_dict = json.loads(message.payload) log_time = int(log_dict.get('timeStamp')) / 1000 user_uuid = log_dict.get('userID') vehicle_id = log_dict.get('vehicleID') cockpit_id = log_dict.get('cockpitID') log_uuid = UserWorkRecord.get_log_uuid(user_uuid, log_time, vehicle_id, cockpit_id) if not log_uuid: methods.debug_log(f"MessageListenerC001|error92", f"user_uuid: {user_uuid}, log_time: {log_time}") return # --- fill log_list --- log_list = [ f"timeStamp: {methods.ts_to_string(log_time)}", # 时间 f"topic: Cockpit/CanBus/CanId001/Cop001", # 话题 # f"userID: {log_dict.get('userID')}", # 用户id # f"cockpitID: {log_dict.get('cockpitID')}", # 舱id # f"vehicleID: {log_dict.get('vehicleID')}", # 车id f"directSwitch: {log_dict.get('basControl').get('directSwitch')}", f"eStop: {log_dict.get('basControl').get('eStop')}", f"gearControl: {log_dict.get('basControl').get('gearControl')}", f"hazardLight: {log_dict.get('basControl').get('hazardLight')}", f"keyStatus: {log_dict.get('basControl').get('keyStatus')}", f"parkControl: {log_dict.get('basControl').get('parkControl')}", f"silencedAlarm: {log_dict.get('basControl').get('silencedAlarm')}", f"travelLight: {log_dict.get('basControl').get('travelLight')}", f"travelMode: {log_dict.get('basControl').get('travelMode')}", f"vehicleHorn: {log_dict.get('basControl').get('vehicleHorn')}", f"accPedal: {log_dict.get('driverControl').get('accPedal').replace('||', 'OR')}", f"brakePedal: {log_dict.get('driverControl').get('brakePedal')}", f"steeringWheel: {log_dict.get('driverControl').get('steeringWheel')}", f"turnMode: {log_dict.get('driverControl').get('turnMode')}", f"turnSignal: {log_dict.get('driverControl').get('turnSignal')}", f"errAccPedal: {log_dict.get('errCode').get('errAccPedal')}", f"errBasOperation: {log_dict.get('errCode').get('errBasOperation')}", f"errBrakePedal: {log_dict.get('errCode').get('errBrakePedal')}", f"errEndTool: {log_dict.get('errCode').get('errEndTool')}", f"errHandle: {log_dict.get('errCode').get('errHandle')}", f"errOther: {log_dict.get('errCode').get('errOther')}", f"errSteeringWheel: {log_dict.get('errCode').get('errSteeringWheel')}", f"baseLegControl: {log_dict.get('taskControl').get('baseLegControl')}", f"baseLegSwitch: {log_dict.get('taskControl').get('baseLegSwitch')}", f"bypassSwitch: {log_dict.get('taskControl').get('bypassSwitch')}", f"cabLift: {log_dict.get('taskControl').get('cabLift')}", f"coopSignal: {log_dict.get('taskControl').get('coopSignal')}", f"enableHydraulic: {log_dict.get('taskControl').get('enableHydraulic')}", f"endJoint: {log_dict.get('taskControl').get('endJoint')}", f"esCabLift: {log_dict.get('taskControl').get('esCabLift')}", f"suckerSelect: {log_dict.get('taskControl').get('suckerSelect')}", f"taskJoint_1: {log_dict.get('taskControl').get('taskJoint_1')}", f"taskJoint_2: {log_dict.get('taskControl').get('taskJoint_2')}", f"taskJoint_3: {log_dict.get('taskControl').get('taskJoint_3')}", f"toolControl: {log_dict.get('taskControl').get('toolControl')}", f"workLight: {log_dict.get('taskControl').get('workLight')}", ] # --- check --- log_key = '-'.join(log_list[1:]) if last_log_key and last_log_key == log_key: # methods.debug_log(f"MessageListener24", f"#86: {last_log_key == log_key}") return # --- update -- last_log_key = log_key # --- save --- # methods.debug_log(f"MessageListenerC001|158", f"#log_list: {log_list}") log_file_path = os.path.join(Global.save_dir, f'{log_uuid}.log') methods.write_text(log_file_path, ' | '.join(log_list) + '\n', 'a') def method_v001(client, userdata, message): """消息处理方法""" # --- log --- # methods.debug_log(f"method_v001|26", f"#message.payload: {message.payload}") # return # --- check --- # if not methods.is_dir(Global.save_dir): # out = methods.run_command(f'mkdir -p {Global.save_dir}', callback=True) # methods.debug_log('MessageListener46', f"#out: {out}") # --- check --- try: log_dict = json.loads(message.payload) log_time = int(log_dict.get('timestamp')) / 1000 user_uuid = log_dict.get('userID') vehicle_id = log_dict.get('VehicleID') cockpit_id = log_dict.get('cockpitID') except Exception as exception: methods.debug_log("method_v001|41", f"#message.payload: {message.payload}") methods.debug_log("method_v001|41", f"#exception: {exception.__class__.__name__}") methods.debug_log("method_v001|41", f"#traceback: {traceback.format_exc()}") return # --- check --- log_uuid = UserWorkRecord.get_log_uuid(user_uuid, log_time, vehicle_id, cockpit_id) if not log_uuid: methods.debug_log(f"MessageListenerV001|error50", f"user_uuid: {user_uuid}, log_time: {log_time}") return # --- fill log_list --- log_list = [ f"timestamp: {methods.ts_to_string(log_time)}", # 时间 f"topic: Vehicle/ControlVehicle/Veh001", # 话题 # f"userID: {log_dict.get('userID')}", # 用户id # f"VehicleID: {log_dict.get('VehicleID')}", # 车id # f"cockpitID: {log_dict.get('cockpitID')}", # 舱id f"directSwitch: {log_dict.get('baseControl')[0].get('directSwitch')}", # 前后切换 f"eStop: {log_dict.get('baseControl')[0].get('eStop')}", # 急停开关 f"gearCaontrol: {log_dict.get('baseControl')[0].get('gearCaontrol')}", # 车辆档位控制 f"hazardLight: {log_dict.get('baseControl')[0].get('hazardLight')}", # 双闪灯 f"keyStatus: {log_dict.get('baseControl')[0].get('keyStatus')}", # 钥匙状态 f"parkControl: {log_dict.get('baseControl')[0].get('parkControl')}", # 驻车控制 f"silencedAlarm: {log_dict.get('baseControl')[0].get('silencedAlarm')}", # 消报警音按钮 f"travelLight: {log_dict.get('baseControl')[0].get('travelLight')}", # 行驶灯光 f"travelMode: {log_dict.get('baseControl')[0].get('travelMode')}", # 驾驶模式 f"vehicleHorn: {log_dict.get('baseControl')[0].get('vehicleHorn')}", # 喇叭 f"accPedalF: {log_dict.get('driveControl')[0].get('accPedal')[0].get('accPedalF')}", # 油门踏板.手油门 f"accPedalH: {log_dict.get('driveControl')[0].get('accPedal')[0].get('accPedalH')}", # 油门踏板.脚油门 f"accPedalH: {log_dict.get('driveControl')[0].get('brakePedal')}", # 刹车踏板 f"steeringWheel: {log_dict.get('driveControl')[0].get('steeringWheel')}", # 方向盘转速 f"turnMode: {log_dict.get('driveControl')[0].get('turnMode')}", # 转向模式 f"turnSignal: {log_dict.get('driveControl')[0].get('turnSignal')}", # 转向灯 f"errAccPedal: {log_dict.get('errCode')[0].get('errAccPedal')}", # 油门信号故障 f"errBasOperation: {log_dict.get('errCode')[0].get('errBasOperation')}", # 基本操作故障 f"errBrakePedal: {log_dict.get('errCode')[0].get('errBrakePedal')}", # 刹车信号故障 f"errEndTool: {log_dict.get('errCode')[0].get('errEndTool')}", # 末端工具控制故障 f"errHandle: {log_dict.get('errCode')[0].get('errHandle')}", # 手柄信号故障 f"errOther: {log_dict.get('errCode')[0].get('errOther')}", # 其他故障 f"errSteeringWheel: {log_dict.get('errCode')[0].get('errSteeringWheel')}", # 转向故障 f"baseLegControl: {log_dict.get('taskControl')[0].get('baseLegControl')}", # 支腿动作控制 f"baseLegSwitch: {log_dict.get('taskControl')[0].get('baseLegSwitch')}", # 支腿选择开关 f"bypassSwitch: {log_dict.get('taskControl')[0].get('bypassSwitch')}", # 旁通开关 f"cabLift: {log_dict.get('taskControl')[0].get('cabLift')}", # 驾驶室升降机构 f"coopSignal: {log_dict.get('taskControl')[0].get('coopSignal')}", # 协同作业信号 f"enableHydraulic: {log_dict.get('taskControl')[0].get('enableHydraulic')}", # 液压使能开关 f"endJoint: {log_dict.get('taskControl')[0].get('endJoint')}", # 末端关节控制 f"esCabLift: {log_dict.get('taskControl')[0].get('esCabLift')}", # 驾驶室应急下降开关 f"suckerSelect: {log_dict.get('taskControl')[0].get('suckerSelect')}", # 吸盘选择开关 f"taskJoint_1: {log_dict.get('taskControl')[0].get('taskJoint_1')}", # 第一个作业关节控制 f"taskJoint_2: {log_dict.get('taskControl')[0].get('taskJoint_2')}", # 第二个作业关节控制 f"taskJoint_3: {log_dict.get('taskControl')[0].get('taskJoint_3')}", # 第三个作业关节控制 f"toolControl: {log_dict.get('taskControl')[0].get('toolControl')}", # 末端工具控制 f"workLight: {log_dict.get('taskControl')[0].get('workLight')}", # 工作灯 ] # --- check --- log_key = '-'.join(log_list[1:]) if last_log_key and last_log_key == log_key: # methods.debug_log(f"MessageListener24", f"#86: {last_log_key == log_key}") return # --- update -- last_log_key = log_key # --- save --- # methods.debug_log(f"MessageListenerV001|113", f"#log_list: {log_list}") log_file_path = os.path.join(Global.save_dir, f'{log_uuid}.log') methods.write_text(log_file_path, ' | '.join(log_list) + '\n', 'a') client = Global.emqx_factory.Client(host=Global.emqx_host, port=Global.emqx_port) client.start_subscribe_loop( decorate_method=getattr(locals(), self.method_name, None), subscribe_topic=self.subscribe_topic )