ConnectionTest.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import socket
  2. import struct
  3. import traceback
  4. import time
  5. import sys
  6. import importlib
  7. # --- for linux
  8. # sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/sri-server-bg03')
  9. # sys.path.append('/home/server/repositories/repositories/sri-project.demo-py/3rdparty')
  10. # host, port = '127.0.0.1', 20916
  11. # host, port = '127.0.0.1', 20917
  12. # --- for windows
  13. sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\sri-server-bg03')
  14. sys.path.append(r'E:\casper\repositories\repositories\sri-project.demo-py\3rdparty')
  15. host, port = '58.34.94.178', 20916
  16. # host, port = '58.34.94.178', 20917
  17. protobuf = importlib.import_module(f"xprotobuf.protocol_pb2")
  18. methods = importlib.import_module(f"xlib")
  19. def test():
  20. head_sequence = '<hh' # 字节序规则
  21. head_size = struct.calcsize(head_sequence)
  22. try:
  23. client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  24. # client.bind(('localhost', port))
  25. client.connect((host, port))
  26. # --- send
  27. object = protobuf.CSSign()
  28. object.account = 'admin'
  29. object.password = '123456'
  30. head = protobuf.CS_Sign, object.ByteSize()
  31. head_bytestream = struct.pack(head_sequence, *head)
  32. body_bytestream = object.SerializeToString()
  33. send_bytestream = head_bytestream + body_bytestream
  34. print(f"Sent data: {send_bytestream}")
  35. client.sendall(send_bytestream)
  36. # 等待接收
  37. count = 0
  38. while True:
  39. # --- 解析数据头
  40. head_bytestream = client.recv(head_size)
  41. if not head_bytestream:
  42. continue
  43. command_id, body_length = struct.unpack(head_sequence, head_bytestream)
  44. methods.debug_log('Connection.284', f"Received values: {command_id, body_length}")
  45. # --- 解析数据体
  46. body_bytestream = client.recv(body_length)
  47. object = protobuf.SCSign()
  48. object.ParseFromString(body_bytestream)
  49. print(f"Received Response: ret={object.ret}, name={object.name}")
  50. # --- debug
  51. count += 1
  52. print(f"count: {count}")
  53. # --- 发送1次就退出
  54. # break
  55. # --- 发送1次后等待
  56. continue
  57. except Exception as exception:
  58. print(f"#exception: {exception.__class__.__name__}")
  59. print(f"#traceback: {traceback.format_exc()}")
  60. finally:
  61. client.close()
  62. if __name__ == "__main__":
  63. test()