MQTTV3112.py 30 KB


  1. """
  2. *******************************************************************
  3. Copyright (c) 2013, 2014 IBM Corp.
  4. All rights reserved. This program and the accompanying materials
  5. are made available under the terms of the Eclipse Public License v2.0
  6. and Eclipse Distribution License v1.0 which accompany this distribution.
  7. The Eclipse Public License is available at
  8. https://www.eclipse.org/legal/epl-2.0/
  9. and the Eclipse Distribution License is available at
  10. http://www.eclipse.org/org/documents/edl-v10.php.
  11. Contributors:
  12. Ian Craggs - initial implementation and/or documentation
  13. *******************************************************************
  14. """
  15. from __future__ import print_function
  16. """
  17. Assertions are used to validate incoming data, but are omitted from outgoing packets. This is
  18. so that the tests that use this package can send invalid data for error testing.
  19. """
  20. import logging
  21. logger = logging.getLogger("mqttsas")
  22. # Low-level protocol interface
  23. class MQTTException(Exception):
  24. pass
  25. # Message types
  26. CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \
  27. PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \
  28. PINGREQ, PINGRESP, DISCONNECT = range(1, 15)
  29. packetNames = [ "reserved", \
  30. "Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \
  31. "Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \
  32. "Pingreq", "Pingresp", "Disconnect"]
  33. classNames = [ "reserved", \
  34. "Connects", "Connacks", "Publishes", "Pubacks", "Pubrecs", "Pubrels", \
  35. "Pubcomps", "Subscribes", "Subacks", "Unsubscribes", "Unsubacks", \
  36. "Pingreqs", "Pingresps", "Disconnects"]
  37. def MessageType(byte):
  38. if byte != None:
  39. rc = ord(byte[0]) >> 4
  40. else:
  41. rc = None
  42. return rc
  43. def getPacket(aSocket):
  44. "receive the next packet"
  45. buf = aSocket.recv(1) # get the first byte fixed header
  46. if buf == b"":
  47. return None
  48. if str(aSocket).find("[closed]") != -1:
  49. closed = True
  50. else:
  51. closed = False
  52. if closed:
  53. return None
  54. # now get the remaining length
  55. multiplier = 1
  56. remlength = 0
  57. while 1:
  58. next = aSocket.recv(1)
  59. while len(next) == 0:
  60. next = aSocket.recv(1)
  61. buf += next
  62. digit = ord(buf[-1])
  63. remlength += (digit & 127) * multiplier
  64. if digit & 128 == 0:
  65. break
  66. multiplier *= 128
  67. # receive the remaining length if there is any
  68. rest = ''
  69. if remlength > 0:
  70. while len(rest) < remlength:
  71. rest += aSocket.recv(remlength-len(rest))
  72. assert len(rest) == remlength
  73. return buf + rest
  74. class FixedHeaders:
  75. def __init__(self, aMessageType):
  76. self.MessageType = aMessageType
  77. self.DUP = False
  78. self.QoS = 0
  79. self.RETAIN = False
  80. self.remainingLength = 0
  81. def __eq__(self, fh):
  82. return self.MessageType == fh.MessageType and \
  83. self.DUP == fh.DUP and \
  84. self.QoS == fh.QoS and \
  85. self.RETAIN == fh.RETAIN # and \
  86. # self.remainingLength == fh.remainingLength
  87. def __repr__(self):
  88. "return printable representation of our data"
  89. return classNames[self.MessageType]+'(DUP='+repr(self.DUP)+ \
  90. ", QoS="+repr(self.QoS)+", Retain="+repr(self.RETAIN)
  91. def pack(self, length):
  92. "pack data into string buffer ready for transmission down socket"
  93. buffer = bytes([(self.MessageType << 4) | (self.DUP << 3) |\
  94. (self.QoS << 1) | self.RETAIN])
  95. self.remainingLength = length
  96. buffer += self.encode(length)
  97. return buffer
  98. def encode(self, x):
  99. assert 0 <= x <= 268435455
  100. buffer = b''
  101. while 1:
  102. digit = x % 128
  103. x //= 128
  104. if x > 0:
  105. digit |= 0x80
  106. buffer += bytes([digit])
  107. if x == 0:
  108. break
  109. return buffer
  110. def unpack(self, buffer):
  111. "unpack data from string buffer into separate fields"
  112. b0 = ord(buffer[0])
  113. self.MessageType = b0 >> 4
  114. self.DUP = ((b0 >> 3) & 0x01) == 1
  115. self.QoS = (b0 >> 1) & 0x03
  116. self.RETAIN = (b0 & 0x01) == 1
  117. (self.remainingLength, bytes) = self.decode(buffer[1:])
  118. return bytes + 1 # length of fixed header
  119. def decode(self, buffer):
  120. multiplier = 1
  121. value = 0
  122. bytes = 0
  123. while 1:
  124. bytes += 1
  125. digit = ord(buffer[0])
  126. buffer = buffer[1:]
  127. value += (digit & 127) * multiplier
  128. if digit & 128 == 0:
  129. break
  130. multiplier *= 128
  131. return (value, bytes)
  132. def writeInt16(length):
  133. return bytes([length // 256, length % 256])
  134. def readInt16(buf):
  135. return ord(buf[0])*256 + ord(buf[1])
  136. def writeUTF(data):
  137. # data could be a string, or bytes. If string, encode into bytes with utf-8
  138. return writeInt16(len(data)) + (data if type(data) == type(b"") else bytes(data, "utf-8"))
  139. def readUTF(buffer, maxlen):
  140. if maxlen >= 2:
  141. length = readInt16(buffer)
  142. else:
  143. raise MQTTException("Not enough data to read string length")
  144. maxlen -= 2
  145. if length > maxlen:
  146. raise MQTTException("Length delimited string too long")
  147. buf = buffer[2:2+length].decode("utf-8")
  148. logger.info("[MQTT-4.7.3-2] topic names and filters not include null")
  149. zz = buf.find("\x00") # look for null in the UTF string
  150. if zz != -1:
  151. raise MQTTException("[MQTT-1.5.3-2] Null found in UTF data "+buf)
  152. """for c in range (0xD800, 0xDFFF):
  153. zz = buf.find(chr(c)) # look for D800-DFFF in the UTF string
  154. if zz != -1:
  155. raise MQTTException("[MQTT-1.5.3-1] D800-DFFF found in UTF data "+buf)
  156. """
  157. if buf.find("\uFEFF") != -1:
  158. logger.info("[MQTT-1.5.3-3] U+FEFF in UTF string")
  159. return buf
  160. def writeBytes(buffer):
  161. return writeInt16(len(buffer)) + buffer
  162. def readBytes(buffer):
  163. length = readInt16(buffer)
  164. return buffer[2:2+length]
  165. class Packets:
  166. def pack(self):
  167. buffer = self.fh.pack(0)
  168. return buffer
  169. def __repr__(self):
  170. return repr(self.fh)
  171. def __eq__(self, packet):
  172. return self.fh == packet.fh if packet else False
  173. class Connects(Packets):
  174. def __init__(self, buffer = None):
  175. self.fh = FixedHeaders(CONNECT)
  176. # variable header
  177. self.ProtocolName = "MQTT"
  178. self.ProtocolVersion = 4
  179. self.CleanSession = True
  180. self.WillFlag = False
  181. self.WillQoS = 0
  182. self.WillRETAIN = 0
  183. self.KeepAliveTimer = 30
  184. self.usernameFlag = False
  185. self.passwordFlag = False
  186. # Payload
  187. self.ClientIdentifier = "" # UTF-8
  188. self.WillTopic = None # UTF-8
  189. self.WillMessage = None # binary
  190. self.username = None # UTF-8
  191. self.password = None # binary
  192. if buffer != None:
  193. self.unpack(buffer)
  194. def pack(self):
  195. connectFlags = bytes([(self.CleanSession << 1) | (self.WillFlag << 2) | \
  196. (self.WillQoS << 3) | (self.WillRETAIN << 5) | \
  197. (self.usernameFlag << 6) | (self.passwordFlag << 7)])
  198. buffer = writeUTF(self.ProtocolName) + bytes([self.ProtocolVersion]) + \
  199. connectFlags + writeInt16(self.KeepAliveTimer)
  200. buffer += writeUTF(self.ClientIdentifier)
  201. if self.WillFlag:
  202. buffer += writeUTF(self.WillTopic)
  203. buffer += writeBytes(self.WillMessage)
  204. if self.usernameFlag:
  205. buffer += writeUTF(self.username)
  206. if self.passwordFlag:
  207. buffer += writeBytes(self.password)
  208. buffer = self.fh.pack(len(buffer)) + buffer
  209. return buffer
  210. def unpack(self, buffer):
  211. assert len(buffer) >= 2
  212. assert MessageType(buffer) == CONNECT
  213. try:
  214. fhlen = self.fh.unpack(buffer)
  215. packlen = fhlen + self.fh.remainingLength
  216. assert len(buffer) >= packlen, "buffer length %d packet length %d" % (len(buffer), packlen)
  217. curlen = fhlen # points to after header + remaining length
  218. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  219. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS was not 0, was %d" % self.fh.QoS
  220. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  221. self.ProtocolName = readUTF(buffer[curlen:], packlen - curlen)
  222. curlen += len(self.ProtocolName) + 2
  223. assert self.ProtocolName == "MQTT", "Wrong protocol name %s" % self.ProtocolName
  224. self.ProtocolVersion = ord(buffer[curlen])
  225. curlen += 1
  226. connectFlags = ord(buffer[curlen])
  227. assert (connectFlags & 0x01) == 0, "[MQTT-3.1.2-3] reserved connect flag must be 0"
  228. self.CleanSession = ((connectFlags >> 1) & 0x01) == 1
  229. self.WillFlag = ((connectFlags >> 2) & 0x01) == 1
  230. self.WillQoS = (connectFlags >> 3) & 0x03
  231. self.WillRETAIN = (connectFlags >> 5) & 0x01
  232. self.passwordFlag = ((connectFlags >> 6) & 0x01) == 1
  233. self.usernameFlag = ((connectFlags >> 7) & 0x01) == 1
  234. curlen +=1
  235. if self.WillFlag:
  236. assert self.WillQoS in [0, 1, 2], "[MQTT-3.1.2-14] will qos must not be 3"
  237. else:
  238. assert self.WillQoS == 0, "[MQTT-3.1.2-13] will qos must be 0, if will flag is false"
  239. assert self.WillRETAIN == False, "[MQTT-3.1.2-14] will retain must be false, if will flag is false"
  240. self.KeepAliveTimer = readInt16(buffer[curlen:])
  241. curlen += 2
  242. logger.info("[MQTT-3.1.3-3] Clientid must be present, and first field")
  243. logger.info("[MQTT-3.1.3-4] Clientid must be Unicode, and between 0 and 65535 bytes long")
  244. self.ClientIdentifier = readUTF(buffer[curlen:], packlen - curlen)
  245. curlen += len(self.ClientIdentifier) + 2
  246. if self.WillFlag:
  247. self.WillTopic = readUTF(buffer[curlen:], packlen - curlen)
  248. curlen += len(self.WillTopic) + 2
  249. self.WillMessage = readBytes(buffer[curlen:])
  250. curlen += len(self.WillMessage) + 2
  251. logger.info("[[MQTT-3.1.2-9] will topic and will message fields must be present")
  252. else:
  253. self.WillTopic = self.WillMessage = None
  254. if self.usernameFlag:
  255. assert len(buffer) > curlen+2, "Buffer too short to read username length"
  256. self.username = readUTF(buffer[curlen:], packlen - curlen)
  257. curlen += len(self.username) + 2
  258. logger.info("[MQTT-3.1.2-19] username must be in payload if user name flag is 1")
  259. else:
  260. logger.info("[MQTT-3.1.2-18] username must not be in payload if user name flag is 0")
  261. assert self.passwordFlag == False, "[MQTT-3.1.2-22] password flag must be 0 if username flag is 0"
  262. if self.passwordFlag:
  263. assert len(buffer) > curlen+2, "Buffer too short to read password length"
  264. self.password = readBytes(buffer[curlen:])
  265. curlen += len(self.password) + 2
  266. logger.info("[MQTT-3.1.2-21] password must be in payload if password flag is 0")
  267. else:
  268. logger.info("[MQTT-3.1.2-20] password must not be in payload if password flag is 0")
  269. if self.WillFlag and self.usernameFlag and self.passwordFlag:
  270. logger.info("[MQTT-3.1.3-1] clientid, will topic, will message, username and password all present")
  271. assert curlen == packlen, "Packet is wrong length curlen %d != packlen %d"
  272. except:
  273. logger.exception("[MQTT-3.1.4-1] server must validate connect packet and close connection without connack if it does not conform")
  274. raise
  275. def __repr__(self):
  276. buf = repr(self.fh)+", ProtocolName="+str(self.ProtocolName)+", ProtocolVersion=" +\
  277. repr(self.ProtocolVersion)+", CleanSession="+repr(self.CleanSession) +\
  278. ", WillFlag="+repr(self.WillFlag)+", KeepAliveTimer=" +\
  279. repr(self.KeepAliveTimer)+", ClientId="+str(self.ClientIdentifier) +\
  280. ", usernameFlag="+repr(self.usernameFlag)+", passwordFlag="+repr(self.passwordFlag)
  281. if self.WillFlag:
  282. buf += ", WillQoS=" + repr(self.WillQoS) +\
  283. ", WillRETAIN=" + repr(self.WillRETAIN) +\
  284. ", WillTopic='"+ self.WillTopic +\
  285. "', WillMessage='"+str(self.WillMessage)+"'"
  286. if self.username:
  287. buf += ", username="+self.username
  288. if self.password:
  289. buf += ", password="+str(self.password)
  290. return buf+")"
  291. def __eq__(self, packet):
  292. rc = Packets.__eq__(self, packet) and \
  293. self.ProtocolName == packet.ProtocolName and \
  294. self.ProtocolVersion == packet.ProtocolVersion and \
  295. self.CleanSession == packet.CleanSession and \
  296. self.WillFlag == packet.WillFlag and \
  297. self.KeepAliveTimer == packet.KeepAliveTimer and \
  298. self.ClientIdentifier == packet.ClientIdentifier and \
  299. self.WillFlag == packet.WillFlag
  300. if rc and self.WillFlag:
  301. rc = self.WillQoS == packet.WillQoS and \
  302. self.WillRETAIN == packet.WillRETAIN and \
  303. self.WillTopic == packet.WillTopic and \
  304. self.WillMessage == packet.WillMessage
  305. return rc
  306. class Connacks(Packets):
  307. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, ReturnCode=0):
  308. self.fh = FixedHeaders(CONNACK)
  309. self.fh.DUP = DUP
  310. self.fh.QoS = QoS
  311. self.fh.Retain = Retain
  312. self.flags = 0
  313. self.returnCode = ReturnCode
  314. if buffer != None:
  315. self.unpack(buffer)
  316. def pack(self):
  317. buffer = bytes([self.flags, self.returnCode])
  318. buffer = self.fh.pack(len(buffer)) + buffer
  319. return buffer
  320. def unpack(self, buffer):
  321. assert len(buffer) >= 4
  322. assert MessageType(buffer) == CONNACK
  323. self.fh.unpack(buffer)
  324. assert self.fh.remainingLength == 2, "Connack packet is wrong length %d" % self.fh.remainingLength
  325. assert ord(buffer[2]) in [0, 1], "Connect Acknowledge Flags"
  326. self.returnCode = ord(buffer[3])
  327. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  328. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  329. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  330. def __repr__(self):
  331. return repr(self.fh)+", Session present="+str((self.flags & 0x01) == 1)+", ReturnCode="+repr(self.returnCode)+")"
  332. def __eq__(self, packet):
  333. return Packets.__eq__(self, packet) and \
  334. self.returnCode == packet.returnCode
  335. class Disconnects(Packets):
  336. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
  337. self.fh = FixedHeaders(DISCONNECT)
  338. self.fh.DUP = DUP
  339. self.fh.QoS = QoS
  340. self.fh.Retain = Retain
  341. if buffer != None:
  342. self.unpack(buffer)
  343. def unpack(self, buffer):
  344. assert len(buffer) >= 2
  345. assert MessageType(buffer) == DISCONNECT
  346. self.fh.unpack(buffer)
  347. assert self.fh.remainingLength == 0, "Disconnect packet is wrong length %d" % self.fh.remainingLength
  348. logger.info("[MQTT-3.14.1-1] disconnect reserved bits must be 0")
  349. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  350. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  351. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  352. def __repr__(self):
  353. return repr(self.fh)+")"
  354. class Publishes(Packets):
  355. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, TopicName="", Payload=b""):
  356. self.fh = FixedHeaders(PUBLISH)
  357. self.fh.DUP = DUP
  358. self.fh.QoS = QoS
  359. self.fh.Retain = Retain
  360. # variable header
  361. self.topicName = TopicName
  362. self.messageIdentifier = MsgId
  363. # payload
  364. self.data = Payload
  365. if buffer != None:
  366. self.unpack(buffer)
  367. def pack(self):
  368. buffer = writeUTF(self.topicName)
  369. if self.fh.QoS != 0:
  370. buffer += writeInt16(self.messageIdentifier)
  371. buffer += self.data
  372. buffer = self.fh.pack(len(buffer)) + buffer
  373. return buffer
  374. def unpack(self, buffer):
  375. assert len(buffer) >= 2
  376. assert MessageType(buffer) == PUBLISH
  377. fhlen = self.fh.unpack(buffer)
  378. assert self.fh.QoS in [0, 1, 2], "QoS in Publish must be 0, 1, or 2"
  379. packlen = fhlen + self.fh.remainingLength
  380. assert len(buffer) >= packlen
  381. curlen = fhlen
  382. try:
  383. self.topicName = readUTF(buffer[fhlen:], packlen - curlen)
  384. except UnicodeDecodeError:
  385. logger.info("[MQTT-3.3.2-1] topic name in publish must be utf-8")
  386. raise
  387. curlen += len(self.topicName) + 2
  388. if self.fh.QoS != 0:
  389. self.messageIdentifier = readInt16(buffer[curlen:])
  390. logger.info("[MQTT-2.3.1-1] packet indentifier must be in publish if QoS is 1 or 2")
  391. curlen += 2
  392. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  393. else:
  394. logger.info("[MQTT-2.3.1-5] no packet indentifier in publish if QoS is 0")
  395. self.messageIdentifier = 0
  396. self.data = buffer[curlen:fhlen + self.fh.remainingLength]
  397. if self.fh.QoS == 0:
  398. assert self.fh.DUP == False, "[MQTT-2.1.2-4]"
  399. return fhlen + self.fh.remainingLength
  400. def __repr__(self):
  401. rc = repr(self.fh)
  402. if self.fh.QoS != 0:
  403. rc += ", MsgId="+repr(self.messageIdentifier)
  404. rc += ", TopicName="+repr(self.topicName)+", Payload="+repr(self.data)+")"
  405. return rc
  406. def __eq__(self, packet):
  407. rc = Packets.__eq__(self, packet) and \
  408. self.topicName == packet.topicName and \
  409. self.data == packet.data
  410. if rc and self.fh.QoS != 0:
  411. rc = self.messageIdentifier == packet.messageIdentifier
  412. return rc
  413. class Pubacks(Packets):
  414. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  415. self.fh = FixedHeaders(PUBACK)
  416. self.fh.DUP = DUP
  417. self.fh.QoS = QoS
  418. self.fh.Retain = Retain
  419. # variable header
  420. self.messageIdentifier = MsgId
  421. if buffer != None:
  422. self.unpack(buffer)
  423. def pack(self):
  424. buffer = writeInt16(self.messageIdentifier)
  425. buffer = self.fh.pack(len(buffer)) + buffer
  426. return buffer
  427. def unpack(self, buffer):
  428. assert len(buffer) >= 2
  429. assert MessageType(buffer) == PUBACK
  430. fhlen = self.fh.unpack(buffer)
  431. assert self.fh.remainingLength == 2, "Puback packet is wrong length %d" % self.fh.remainingLength
  432. assert len(buffer) >= fhlen + self.fh.remainingLength
  433. self.messageIdentifier = readInt16(buffer[fhlen:])
  434. assert self.fh.DUP == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
  435. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
  436. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Puback reserved bits must be 0"
  437. return fhlen + 2
  438. def __repr__(self):
  439. return repr(self.fh)+", MsgId "+repr(self.messageIdentifier)
  440. def __eq__(self, packet):
  441. return Packets.__eq__(self, packet) and \
  442. self.messageIdentifier == packet.messageIdentifier
  443. class Pubrecs(Packets):
  444. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  445. self.fh = FixedHeaders(PUBREC)
  446. self.fh.DUP = DUP
  447. self.fh.QoS = QoS
  448. self.fh.Retain = Retain
  449. # variable header
  450. self.messageIdentifier = MsgId
  451. if buffer != None:
  452. self.unpack(buffer)
  453. def pack(self):
  454. buffer = writeInt16(self.messageIdentifier)
  455. buffer = self.fh.pack(len(buffer)) + buffer
  456. return buffer
  457. def unpack(self, buffer):
  458. assert len(buffer) >= 2
  459. assert MessageType(buffer) == PUBREC
  460. fhlen = self.fh.unpack(buffer)
  461. assert self.fh.remainingLength == 2, "Pubrec packet is wrong length %d" % self.fh.remainingLength
  462. assert len(buffer) >= fhlen + self.fh.remainingLength
  463. self.messageIdentifier = readInt16(buffer[fhlen:])
  464. assert self.fh.DUP == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
  465. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
  466. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Pubrec reserved bits must be 0"
  467. return fhlen + 2
  468. def __repr__(self):
  469. return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
  470. def __eq__(self, packet):
  471. return Packets.__eq__(self, packet) and \
  472. self.messageIdentifier == packet.messageIdentifier
  473. class Pubrels(Packets):
  474. def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0):
  475. self.fh = FixedHeaders(PUBREL)
  476. self.fh.DUP = DUP
  477. self.fh.QoS = QoS
  478. self.fh.Retain = Retain
  479. # variable header
  480. self.messageIdentifier = MsgId
  481. if buffer != None:
  482. self.unpack(buffer)
  483. def pack(self):
  484. buffer = writeInt16(self.messageIdentifier)
  485. buffer = self.fh.pack(len(buffer)) + buffer
  486. return buffer
  487. def unpack(self, buffer):
  488. assert len(buffer) >= 2
  489. assert MessageType(buffer) == PUBREL
  490. fhlen = self.fh.unpack(buffer)
  491. assert self.fh.remainingLength == 2, "Pubrel packet is wrong length %d" % self.fh.remainingLength
  492. assert len(buffer) >= fhlen + self.fh.remainingLength
  493. self.messageIdentifier = readInt16(buffer[fhlen:])
  494. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in PUBREL"
  495. assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS should be 1 in PUBREL"
  496. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN should be False in PUBREL"
  497. logger.info("[MQTT-3.6.1-1] bits in fixed header for pubrel are ok")
  498. return fhlen + 2
  499. def __repr__(self):
  500. return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
  501. def __eq__(self, packet):
  502. return Packets.__eq__(self, packet) and \
  503. self.messageIdentifier == packet.messageIdentifier
  504. class Pubcomps(Packets):
  505. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  506. self.fh = FixedHeaders(PUBCOMP)
  507. self.fh.DUP = DUP
  508. self.fh.QoS = QoS
  509. self.fh.Retain = Retain
  510. # variable header
  511. self.messageIdentifier = MsgId
  512. if buffer != None:
  513. self.unpack(buffer)
  514. def pack(self):
  515. buffer = writeInt16(self.messageIdentifier)
  516. buffer = self.fh.pack(len(buffer)) + buffer
  517. return buffer
  518. def unpack(self, buffer):
  519. assert len(buffer) >= 2
  520. assert MessageType(buffer) == PUBCOMP
  521. fhlen = self.fh.unpack(buffer)
  522. assert len(buffer) >= fhlen + self.fh.remainingLength
  523. assert self.fh.remainingLength == 2, "Pubcomp packet is wrong length %d" % self.fh.remainingLength
  524. self.messageIdentifier = readInt16(buffer[fhlen:])
  525. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be False in Pubcomp"
  526. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in Pubcomp"
  527. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in Pubcomp"
  528. return fhlen + 2
  529. def __repr__(self):
  530. return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
  531. def __eq__(self, packet):
  532. return Packets.__eq__(self, packet) and \
  533. self.messageIdentifier == packet.messageIdentifier
  534. class Subscribes(Packets):
  535. def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
  536. self.fh = FixedHeaders(SUBSCRIBE)
  537. self.fh.DUP = DUP
  538. self.fh.QoS = QoS
  539. self.fh.Retain = Retain
  540. # variable header
  541. self.messageIdentifier = MsgId
  542. # payload - list of topic, qos pairs
  543. self.data = Data[:]
  544. if buffer != None:
  545. self.unpack(buffer)
  546. def pack(self):
  547. buffer = writeInt16(self.messageIdentifier)
  548. for d in self.data:
  549. buffer += writeUTF(d[0]) + bytes([d[1]])
  550. buffer = self.fh.pack(len(buffer)) + buffer
  551. return buffer
  552. def unpack(self, buffer):
  553. assert len(buffer) >= 2
  554. assert MessageType(buffer) == SUBSCRIBE
  555. fhlen = self.fh.unpack(buffer)
  556. assert len(buffer) >= fhlen + self.fh.remainingLength
  557. logger.info("[MQTT-2.3.1-1] packet indentifier must be in subscribe")
  558. self.messageIdentifier = readInt16(buffer[fhlen:])
  559. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  560. leftlen = self.fh.remainingLength - 2
  561. self.data = []
  562. while leftlen > 0:
  563. topic = readUTF(buffer[-leftlen:], leftlen)
  564. leftlen -= len(topic) + 2
  565. qos = ord(buffer[-leftlen])
  566. assert qos in [0, 1, 2], "[MQTT-3-8.3-2] reserved bits must be zero"
  567. leftlen -= 1
  568. self.data.append((topic, qos))
  569. assert len(self.data) > 0, "[MQTT-3.8.3-1] at least one topic, qos pair must be in subscribe"
  570. assert leftlen == 0
  571. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP must be false in subscribe"
  572. assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS must be 1 in subscribe"
  573. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN must be false in subscribe"
  574. return fhlen + self.fh.remainingLength
  575. def __repr__(self):
  576. return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
  577. ", Data="+repr(self.data)+")"
  578. def __eq__(self, packet):
  579. return Packets.__eq__(self, packet) and \
  580. self.messageIdentifier == packet.messageIdentifier and \
  581. self.data == packet.data
  582. class Subacks(Packets):
  583. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0, Data=[]):
  584. self.fh = FixedHeaders(SUBACK)
  585. self.fh.DUP = DUP
  586. self.fh.QoS = QoS
  587. self.fh.Retain = Retain
  588. # variable header
  589. self.messageIdentifier = MsgId
  590. # payload - list of qos
  591. self.data = Data[:]
  592. if buffer != None:
  593. self.unpack(buffer)
  594. def pack(self):
  595. buffer = writeInt16(self.messageIdentifier)
  596. for d in self.data:
  597. buffer += bytes([d])
  598. buffer = self.fh.pack(len(buffer)) + buffer
  599. return buffer
  600. def unpack(self, buffer):
  601. assert len(buffer) >= 2
  602. assert MessageType(buffer) == SUBACK
  603. fhlen = self.fh.unpack(buffer)
  604. assert len(buffer) >= fhlen + self.fh.remainingLength
  605. self.messageIdentifier = readInt16(buffer[fhlen:])
  606. leftlen = self.fh.remainingLength - 2
  607. self.data = []
  608. while leftlen > 0:
  609. qos = buffer[-leftlen]
  610. assert ord(qos) in [0, 1, 2, 0x80], "[MQTT-3.9.3-2] return code in QoS must be 0, 1, 2 or 0x80, was "+ord(qos)
  611. leftlen -= 1
  612. self.data.append(qos)
  613. assert leftlen == 0
  614. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be false in suback"
  615. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in suback"
  616. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in suback"
  617. return fhlen + self.fh.remainingLength
  618. def __repr__(self):
  619. return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
  620. ", Data="+repr(self.data)+")"
  621. def __eq__(self, packet):
  622. return Packets.__eq__(self, packet) and \
  623. self.messageIdentifier == packet.messageIdentifier and \
  624. self.data == packet.data
  625. class Unsubscribes(Packets):
  626. def __init__(self, buffer=None, DUP=False, QoS=1, Retain=False, MsgId=0, Data=[]):
  627. self.fh = FixedHeaders(UNSUBSCRIBE)
  628. self.fh.DUP = DUP
  629. self.fh.QoS = QoS
  630. self.fh.Retain = Retain
  631. # variable header
  632. self.messageIdentifier = MsgId
  633. # payload - list of topics
  634. self.data = Data[:]
  635. if buffer != None:
  636. self.unpack(buffer)
  637. def pack(self):
  638. buffer = writeInt16(self.messageIdentifier)
  639. for d in self.data:
  640. buffer += writeUTF(d)
  641. buffer = self.fh.pack(len(buffer)) + buffer
  642. return buffer
  643. def unpack(self, buffer):
  644. assert len(buffer) >= 2
  645. assert MessageType(buffer) == UNSUBSCRIBE
  646. fhlen = self.fh.unpack(buffer)
  647. assert len(buffer) >= fhlen + self.fh.remainingLength
  648. logger.info("[MQTT-2.3.1-1] packet indentifier must be in unsubscribe")
  649. self.messageIdentifier = readInt16(buffer[fhlen:])
  650. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  651. leftlen = self.fh.remainingLength - 2
  652. self.data = []
  653. while leftlen > 0:
  654. topic = readUTF(buffer[-leftlen:], leftlen)
  655. leftlen -= len(topic) + 2
  656. self.data.append(topic)
  657. assert leftlen == 0
  658. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  659. assert self.fh.QoS == 1, "[MQTT-2.1.2-1]"
  660. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  661. logger.info("[MQTT-3-10.1-1] fixed header bits are 0,0,1,0")
  662. return fhlen + self.fh.remainingLength
  663. def __repr__(self):
  664. return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+\
  665. ", Data="+repr(self.data)+")"
  666. def __eq__(self, packet):
  667. return Packets.__eq__(self, packet) and \
  668. self.messageIdentifier == packet.messageIdentifier and \
  669. self.data == packet.data
  670. class Unsubacks(Packets):
  671. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False, MsgId=0):
  672. self.fh = FixedHeaders(UNSUBACK)
  673. self.fh.DUP = DUP
  674. self.fh.QoS = QoS
  675. self.fh.Retain = Retain
  676. # variable header
  677. self.messageIdentifier = MsgId
  678. if buffer != None:
  679. self.unpack(buffer)
  680. def pack(self):
  681. buffer = writeInt16(self.messageIdentifier)
  682. buffer = self.fh.pack(len(buffer)) + buffer
  683. return buffer
  684. def unpack(self, buffer):
  685. assert len(buffer) >= 2
  686. assert MessageType(buffer) == UNSUBACK
  687. fhlen = self.fh.unpack(buffer)
  688. assert len(buffer) >= fhlen + self.fh.remainingLength
  689. self.messageIdentifier = readInt16(buffer[fhlen:])
  690. assert self.messageIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  691. self.messageIdentifier = readInt16(buffer[fhlen:])
  692. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  693. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  694. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  695. return fhlen + self.fh.remainingLength
  696. def __repr__(self):
  697. return repr(self.fh)+", MsgId="+repr(self.messageIdentifier)+")"
  698. def __eq__(self, packet):
  699. return Packets.__eq__(self, packet) and \
  700. self.messageIdentifier == packet.messageIdentifier
  701. class Pingreqs(Packets):
  702. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
  703. self.fh = FixedHeaders(PINGREQ)
  704. self.fh.DUP = DUP
  705. self.fh.QoS = QoS
  706. self.fh.Retain = Retain
  707. if buffer != None:
  708. self.unpack(buffer)
  709. def unpack(self, buffer):
  710. assert len(buffer) >= 2
  711. assert MessageType(buffer) == PINGREQ
  712. fhlen = self.fh.unpack(buffer)
  713. assert self.fh.remainingLength == 0
  714. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  715. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  716. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  717. return fhlen
  718. def __repr__(self):
  719. return repr(self.fh)+")"
  720. class Pingresps(Packets):
  721. def __init__(self, buffer=None, DUP=False, QoS=0, Retain=False):
  722. self.fh = FixedHeaders(PINGRESP)
  723. self.fh.DUP = DUP
  724. self.fh.QoS = QoS
  725. self.fh.Retain = Retain
  726. if buffer != None:
  727. self.unpack(buffer)
  728. def unpack(self, buffer):
  729. assert len(buffer) >= 2
  730. assert MessageType(buffer) == PINGRESP
  731. fhlen = self.fh.unpack(buffer)
  732. assert self.fh.remainingLength == 0
  733. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  734. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  735. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  736. return fhlen
  737. def __repr__(self):
  738. return repr(self.fh)+")"
  739. classes = [None, Connects, Connacks, Publishes, Pubacks, Pubrecs,
  740. Pubrels, Pubcomps, Subscribes, Subacks, Unsubscribes,
  741. Unsubacks, Pingreqs, Pingresps, Disconnects]
  742. def unpackPacket(buffer):
  743. if MessageType(buffer) != None:
  744. packet = classes[MessageType(buffer)]()
  745. packet.unpack(buffer)
  746. else:
  747. packet = None
  748. return packet
  749. if __name__ == "__main__":
  750. fh = FixedHeaders(CONNECT)
  751. tests = [0, 56, 127, 128, 8888, 16383, 16384, 65535, 2097151, 2097152,
  752. 20555666, 268435454, 268435455]
  753. for x in tests:
  754. try:
  755. assert x == fh.decode(fh.encode(x))[0]
  756. except AssertionError:
  757. print("Test failed for x =", x, fh.decode(fh.encode(x)))
  758. try:
  759. fh.decode(fh.encode(268435456))
  760. print("Error")
  761. except AssertionError:
  762. pass
  763. for packet in classes[1:]:
  764. before = str(packet())
  765. after = str(unpackPacket(packet().pack()))
  766. try:
  767. assert before == after
  768. except:
  769. print("before:", before, "\nafter:", after)
  770. print("End")