123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- """
- echo "执行:进入调试" \
- && project_path="/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01" \
- && cd ${project_path} \
- && sudo docker exec -it sri-dino-pyserver01 bash
- python3 /home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01/lib/ConnectionTest.py
- """
- import socket
- import struct
- import traceback
- import time
- import sys
- import importlib
- # --- for linux
- sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01')
- sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/3rdparty')
- # host, port = '127.0.0.1', 20916
- host, port = '127.0.0.1', 20917
- # --- for windows
- # sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\sri-server-bg03')
- # sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\3rdparty')
- # host, port = '58.34.94.178', 20916
- # host, port = '58.34.94.178', 20916
- # host, port = '58.34.94.178', 20917
- # --- import
- protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
- methods = importlib.import_module(f"xlib")
- def test():
- head_sequence = '<hh' # 字节序规则
- head_size = struct.calcsize(head_sequence)
- try:
- client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # client.bind(('localhost', port))
- client.connect((host, port))
- # --- send 2000
- object = protobuf.CSSign()
- object.account = 'admin'
- object.password = '123456'
- head = protobuf.CS_Sign, object.ByteSize()
- head_bytestream = struct.pack(head_sequence, *head)
- body_bytestream = object.SerializeToString()
- send_bytestream = head_bytestream + body_bytestream
- print(f"Sent data: {send_bytestream}")
- client.sendall(send_bytestream)
- # 等待接收
- count = 0
- while True:
- # --- 解析数据头
- head_bytestream = client.recv(head_size)
- if not head_bytestream:
- continue
- command_id, body_length = struct.unpack(head_sequence, head_bytestream)
- methods.debug_log('ConnectionTest64', f"Received values: {command_id, body_length}")
- # --- 解析数据体 4000
- # body_bytestream = client.recv(body_length)
- # object = protobuf.SCSign()
- # object.ParseFromString(body_bytestream)
- # methods.debug_log('ConnectionTest70', f"Received Response:")
- # methods.debug_log('ConnectionTest70', f"#ret: {object.ret}")
- # methods.debug_log('ConnectionTest70', f"#uid: {object.uid}")
- # methods.debug_log('ConnectionTest70', f"#uid: {object.name}")
- # --- 解析数据体 4008
- body_bytestream = client.recv(body_length)
- object = protobuf.SCRobot()
- object.ParseFromString(body_bytestream)
- print(f'#################### {type(object.robot)}')
- print(f'-------------------- {object.robot[0].rid}')
- print(f'-------------------- {object.robot[0].name}')
- print(f'-------------------- {object.robot[0].type}')
- print(f'-------------------- {object.robot[0].state}')
- print(f'-------------------- {object.robot[1].rid}')
- print(f'-------------------- {object.robot[1].name}')
- print(f'-------------------- {object.robot[1].type}')
- print(f'-------------------- {object.robot[1].state}')
- # --- debug
- count += 1
- print(f"count: {count}")
- # --- 发送1次就退出
- # break
- # --- 发送1次后等待
- continue
- except Exception as exception:
- print(f"#exception: {exception.__class__.__name__}")
- print(f"#traceback: {traceback.format_exc()}")
- finally:
- client.close()
- if __name__ == "__main__":
- test()
|