ConnectionTest4008.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. """
  2. echo "执行:进入调试" \
  3. && project_path="/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01" \
  4. && cd ${project_path} \
  5. && sudo docker exec -it sri-dino-pyserver01 bash
  6. python3 /home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01/lib/ConnectionTest.py
  7. """
  8. import socket
  9. import struct
  10. import traceback
  11. import time
  12. import sys
  13. import importlib
  14. # --- for linux
  15. sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/sri-dino-pyserver01')
  16. sys.path.append('/home/sri/repositories/repositories/sri-project.demo-py/3rdparty')
  17. # host, port = '127.0.0.1', 20916
  18. host, port = '127.0.0.1', 20917
  19. # --- for windows
  20. # sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\sri-server-bg03')
  21. # sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\3rdparty')
  22. # host, port = '58.34.94.178', 20916
  23. # host, port = '58.34.94.178', 20916
  24. # host, port = '58.34.94.178', 20917
  25. # --- import
  26. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  27. methods = importlib.import_module(f"xlib")
  28. def test():
  29. head_sequence = '<hh' # 字节序规则
  30. head_size = struct.calcsize(head_sequence)
  31. try:
  32. client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  33. # client.bind(('localhost', port))
  34. client.connect((host, port))
  35. # --- send 2000
  36. object = protobuf.CSSign()
  37. object.account = 'admin'
  38. object.password = '123456'
  39. head = protobuf.CS_Sign, object.ByteSize()
  40. head_bytestream = struct.pack(head_sequence, *head)
  41. body_bytestream = object.SerializeToString()
  42. send_bytestream = head_bytestream + body_bytestream
  43. print(f"Sent data: {send_bytestream}")
  44. client.sendall(send_bytestream)
  45. # 等待接收
  46. count = 0
  47. while True:
  48. # --- 解析数据头
  49. head_bytestream = client.recv(head_size)
  50. if not head_bytestream:
  51. continue
  52. command_id, body_length = struct.unpack(head_sequence, head_bytestream)
  53. methods.debug_log('ConnectionTest64', f"Received values: {command_id, body_length}")
  54. # --- 解析数据体 4000
  55. # body_bytestream = client.recv(body_length)
  56. # object = protobuf.SCSign()
  57. # object.ParseFromString(body_bytestream)
  58. # methods.debug_log('ConnectionTest70', f"Received Response:")
  59. # methods.debug_log('ConnectionTest70', f"#ret: {object.ret}")
  60. # methods.debug_log('ConnectionTest70', f"#uid: {object.uid}")
  61. # methods.debug_log('ConnectionTest70', f"#uid: {object.name}")
  62. # --- 解析数据体 4008
  63. body_bytestream = client.recv(body_length)
  64. object = protobuf.SCRobot()
  65. object.ParseFromString(body_bytestream)
  66. print(f'#################### {type(object.robot)}')
  67. print(f'-------------------- {object.robot[0].rid}')
  68. print(f'-------------------- {object.robot[0].name}')
  69. print(f'-------------------- {object.robot[0].type}')
  70. print(f'-------------------- {object.robot[0].state}')
  71. print(f'-------------------- {object.robot[1].rid}')
  72. print(f'-------------------- {object.robot[1].name}')
  73. print(f'-------------------- {object.robot[1].type}')
  74. print(f'-------------------- {object.robot[1].state}')
  75. # --- debug
  76. count += 1
  77. print(f"count: {count}")
  78. # --- 发送1次就退出
  79. # break
  80. # --- 发送1次后等待
  81. continue
  82. except Exception as exception:
  83. print(f"#exception: {exception.__class__.__name__}")
  84. print(f"#traceback: {traceback.format_exc()}")
  85. finally:
  86. client.close()
  87. if __name__ == "__main__":
  88. test()