""" echo "执行:进入调试" \ && project_path="/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01" \ && cd ${project_path} \ && sudo docker exec -it sri-dino-pyserver01 bash python3 /home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01/lib/ConnectionTest.py """ import socket import struct import traceback import time import sys import importlib # --- for linux sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01') sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/3rdparty') # host, port = '127.0.0.1', 20916 host, port = '127.0.0.1', 20917 # --- for windows # sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\sri-server-bg03') # sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\3rdparty') # host, port = '58.34.94.178', 20916 # host, port = '58.34.94.178', 20916 # host, port = '58.34.94.178', 20917 # --- import protobuf = importlib.import_module(f"xprotobuf.protocol_pb2") methods = importlib.import_module(f"xlib") def test(): head_sequence = '<hh' # 字节序规则 head_size = struct.calcsize(head_sequence) try: client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # client.bind(('localhost', port)) client.connect((host, port)) # --- send 2000 object = protobuf.CSSign() object.account = 'admin' object.password = '123456' head = protobuf.CS_Sign, object.ByteSize() head_bytestream = struct.pack(head_sequence, *head) body_bytestream = object.SerializeToString() send_bytestream = head_bytestream + body_bytestream print(f"Sent data: {send_bytestream}") client.sendall(send_bytestream) # 等待接收 count = 0 while True: # --- 解析数据头 head_bytestream = client.recv(head_size) if not head_bytestream: continue command_id, body_length = struct.unpack(head_sequence, head_bytestream) methods.debug_log('ConnectionTest64', f"Received values: {command_id, body_length}") # --- 解析数据体 4000 # body_bytestream = client.recv(body_length) # object = protobuf.SCSign() # object.ParseFromString(body_bytestream) # methods.debug_log('ConnectionTest70', f"Received Response:") # methods.debug_log('ConnectionTest70', f"#ret: {object.ret}") # methods.debug_log('ConnectionTest70', f"#uid: {object.uid}") # methods.debug_log('ConnectionTest70', f"#uid: {object.name}") # --- 解析数据体 4008 body_bytestream = client.recv(body_length) object = protobuf.SCRobot() object.ParseFromString(body_bytestream) print(f'#################### {type(object.robot)}') print(f'-------------------- {object.robot[0].rid}') print(f'-------------------- {object.robot[0].name}') print(f'-------------------- {object.robot[0].type}') print(f'-------------------- {object.robot[0].state}') print(f'-------------------- {object.robot[1].rid}') print(f'-------------------- {object.robot[1].name}') print(f'-------------------- {object.robot[1].type}') print(f'-------------------- {object.robot[1].state}') # --- debug count += 1 print(f"count: {count}") # --- 发送1次就退出 # break # --- 发送1次后等待 continue except Exception as exception: print(f"#exception: {exception.__class__.__name__}") print(f"#traceback: {traceback.format_exc()}") finally: client.close() if __name__ == "__main__": test()