MQTTV5.py 54 KB


  1. """
  2. *******************************************************************
  3. Copyright (c) 2013, 2018 IBM Corp.
  4. All rights reserved. This program and the accompanying materials
  5. are made available under the terms of the Eclipse Public License v2.0
  6. and Eclipse Distribution License v1.0 which accompany this distribution.
  7. The Eclipse Public License is available at
  8. https://www.eclipse.org/legal/epl-2.0/
  9. and the Eclipse Distribution License is available at
  10. http://www.eclipse.org/org/documents/edl-v10.php.
  11. Contributors:
  12. Ian Craggs - initial implementation and/or documentation
  13. Ian Craggs - take MQTT 3.1.1 and create MQTT 5.0 version
  14. *******************************************************************
  15. """
  16. """
  17. Assertions are used to validate incoming data, but are omitted from outgoing packets. This is
  18. so that the tests that use this package can send invalid data for error testing.
  19. """
  20. import logging, struct
  21. logger = logging.getLogger('MQTTV5')
  22. # Low-level protocol interface
  23. class MQTTException(Exception):
  24. pass
  25. class MalformedPacket(MQTTException):
  26. pass
  27. class ProtocolError(MQTTException):
  28. pass
  29. MAX_PACKET_SIZE = 2**28-1
  30. MAX_PACKETID = 2**16-1
  31. class PacketTypes:
  32. indexes = range(1, 16)
  33. # Packet types
  34. CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, \
  35. PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, \
  36. PINGREQ, PINGRESP, DISCONNECT, AUTH = indexes
  37. # Dummy packet type for properties use - will delay only applies to will
  38. WILLMESSAGE = 99
  39. class Packets(object):
  40. Names = [ "reserved", \
  41. "Connect", "Connack", "Publish", "Puback", "Pubrec", "Pubrel", \
  42. "Pubcomp", "Subscribe", "Suback", "Unsubscribe", "Unsuback", \
  43. "Pingreq", "Pingresp", "Disconnect", "Auth"]
  44. classNames = [name+'es' if name == "Publish" else
  45. name+'s' if name != "reserved" else name for name in Names]
  46. def pack(self):
  47. buffer = self.fh.pack(0)
  48. return buffer
  49. def __str__(self):
  50. return str(self.fh)
  51. def __eq__(self, packet):
  52. return self.fh == packet.fh if packet else False
  53. def __setattr__(self, name, value):
  54. if name not in self.names:
  55. raise MQTTException(name + " Attribute name must be one of "+str(self.names))
  56. object.__setattr__(self, name, value)
  57. def PacketType(byte):
  58. """
  59. Retrieve the message type from the first byte of the fixed header.
  60. """
  61. if byte != None:
  62. rc = byte[0] >> 4
  63. else:
  64. rc = None
  65. return rc
  66. class ReasonCodes:
  67. """
  68. The reason code used in MQTT V5.0
  69. """
  70. def __getName__(self, packetType, identifier):
  71. """
  72. used when displaying the reason code
  73. """
  74. assert identifier in self.names.keys(), identifier
  75. names = self.names[identifier]
  76. namelist = [name for name in names.keys() if packetType in names[name]]
  77. assert len(namelist) == 1
  78. return namelist[0]
  79. def getId(self, name):
  80. """
  81. used when setting the reason code for a packetType
  82. check that only valid codes for the packet are set
  83. """
  84. identifier = None
  85. for code in self.names.keys():
  86. if name in self.names[code].keys():
  87. if self.packetType in self.names[code][name]:
  88. identifier = code
  89. break
  90. assert identifier != None, name
  91. return identifier
  92. def set(self, name):
  93. self.value = self.getId(name)
  94. def unpack(self, buffer):
  95. name = self.__getName__(self.packetType, buffer[0])
  96. self.value = self.getId(name)
  97. return 1
  98. def getName(self):
  99. return self.__getName__(self.packetType, self.value)
  100. def __str__(self):
  101. return self.getName()
  102. def pack(self):
  103. return bytes([self.value])
  104. def __init__(self, packetType, aName="Success", identifier=-1):
  105. self.packetType = packetType
  106. self.names = {
  107. 0 : { "Success" : [PacketTypes.CONNACK, PacketTypes.PUBACK,
  108. PacketTypes.PUBREC, PacketTypes.PUBREL, PacketTypes.PUBCOMP,
  109. PacketTypes.UNSUBACK, PacketTypes.AUTH],
  110. "Normal disconnection" : [PacketTypes.DISCONNECT],
  111. "Granted QoS 0" : [PacketTypes.SUBACK] },
  112. 1 : { "Granted QoS 1" : [PacketTypes.SUBACK] },
  113. 2 : { "Granted QoS 2" : [PacketTypes.SUBACK] },
  114. 4 : { "Disconnect with will message" : [PacketTypes.DISCONNECT] },
  115. 16 : { "No matching subscribers" :
  116. [PacketTypes.PUBACK, PacketTypes.PUBREC] },
  117. 17 : { "No subscription found" : [PacketTypes.UNSUBACK] },
  118. 24 : { "Continue authentication" : [PacketTypes.AUTH] },
  119. 25 : { "Re-authenticate" : [PacketTypes.AUTH] },
  120. 128 : { "Unspecified error" : [PacketTypes.CONNACK, PacketTypes.PUBACK,
  121. PacketTypes.PUBREC, PacketTypes.SUBACK, PacketTypes.UNSUBACK,
  122. PacketTypes.DISCONNECT], },
  123. 129 : { "Malformed packet" :
  124. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  125. 130 : { "Protocol error" :
  126. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  127. 131 : { "Implementation specific error": [PacketTypes.CONNACK,
  128. PacketTypes.PUBACK, PacketTypes.PUBREC, PacketTypes.SUBACK,
  129. PacketTypes.UNSUBACK, PacketTypes.DISCONNECT], },
  130. 132 : { "Unsupported protocol version" : [PacketTypes.CONNACK] },
  131. 133 : { "Client identifier not valid" : [PacketTypes.CONNACK] },
  132. 134 : { "Bad user name or password" : [PacketTypes.CONNACK] },
  133. 135 : { "Not authorized" : [PacketTypes.CONNACK, PacketTypes.PUBACK,
  134. PacketTypes.PUBREC, PacketTypes.SUBACK, PacketTypes.UNSUBACK,
  135. PacketTypes.DISCONNECT], },
  136. 136 : { "Server unavailable" : [PacketTypes.CONNACK] },
  137. 137 : { "Server busy" : [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  138. 138 : { "Banned" : [PacketTypes.CONNACK] },
  139. 139 : { "Server shutting down" : [PacketTypes.DISCONNECT] },
  140. 140 : { "Bad authentication method" :
  141. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  142. 141 : { "Keep alive timeout" : [PacketTypes.DISCONNECT] },
  143. 142 : { "Session taken over" : [PacketTypes.DISCONNECT] },
  144. 143 : { "Topic filter invalid" :
  145. [PacketTypes.SUBACK, PacketTypes.UNSUBACK, PacketTypes.DISCONNECT]},
  146. 144 : { "Topic name invalid" :
  147. [PacketTypes.CONNACK, PacketTypes.PUBACK,
  148. PacketTypes.PUBREC, PacketTypes.DISCONNECT]},
  149. 145 : { "Packet identifier in use" :
  150. [PacketTypes.PUBACK, PacketTypes.PUBREC,
  151. PacketTypes.SUBACK, PacketTypes.UNSUBACK]},
  152. 146 : { "Packet identifier not found" :
  153. [PacketTypes.PUBREL, PacketTypes.PUBCOMP] },
  154. 147 : { "Receive maximum exceeded": [PacketTypes.DISCONNECT] },
  155. 148 : { "Topic alias invalid": [PacketTypes.DISCONNECT] },
  156. 149 : { "Packet too large": [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  157. 150 : { "Message rate too high": [PacketTypes.DISCONNECT] },
  158. 151 : { "Quota exceeded": [PacketTypes.CONNACK, PacketTypes.PUBACK,
  159. PacketTypes.PUBREC, PacketTypes.SUBACK, PacketTypes.DISCONNECT], },
  160. 152 : { "Administrative action" : [PacketTypes.DISCONNECT] },
  161. 153 : { "Payload format invalid" :
  162. [PacketTypes.PUBACK, PacketTypes.PUBREC, PacketTypes.DISCONNECT]},
  163. 154 : { "Retain not supported" :
  164. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  165. 155 : { "QoS not supported" :
  166. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  167. 156 : { "Use another server" :
  168. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  169. 157 : { "Server moved" :
  170. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  171. 158 : { "Shared subscription not supported" :
  172. [PacketTypes.SUBACK, PacketTypes.DISCONNECT] },
  173. 159 : { "Connection rate exceeded" :
  174. [PacketTypes.CONNACK, PacketTypes.DISCONNECT] },
  175. 160 : { "Maximum connect time" :
  176. [PacketTypes.DISCONNECT] },
  177. 161 : { "Subscription identifiers not supported" :
  178. [PacketTypes.SUBACK, PacketTypes.DISCONNECT] },
  179. 162 : { "Wildcard subscription not supported" :
  180. [PacketTypes.SUBACK, PacketTypes.DISCONNECT] },
  181. }
  182. if identifier == -1:
  183. self.set(aName)
  184. else:
  185. self.value = identifier
  186. self.getName() # check it's good
  187. class VBIs: # Variable Byte Integer
  188. @staticmethod
  189. def encode(x):
  190. """
  191. Convert an integer 0 <= x <= 268435455 into multi-byte format.
  192. Returns the buffer convered from the integer.
  193. """
  194. assert 0 <= x <= 268435455
  195. buffer = b''
  196. while 1:
  197. digit = x % 128
  198. x //= 128
  199. if x > 0:
  200. digit |= 0x80
  201. buffer += bytes([digit])
  202. if x == 0:
  203. break
  204. return buffer
  205. @staticmethod
  206. def decode(buffer):
  207. """
  208. Get the value of a multi-byte integer from a buffer
  209. Return the value, and the number of bytes used.
  210. [MQTT-1.5.5-1] the encoded value MUST use the minimum number of bytes necessary to represent the value
  211. """
  212. multiplier = 1
  213. value = 0
  214. bytes = 0
  215. while 1:
  216. bytes += 1
  217. digit = buffer[0]
  218. buffer = buffer[1:]
  219. value += (digit & 127) * multiplier
  220. if digit & 128 == 0:
  221. break
  222. multiplier *= 128
  223. return (value, bytes)
  224. def getPacket(aSocket):
  225. "receive the next packet"
  226. buf = aSocket.recv(1) # get the first byte fixed header
  227. if buf == b"":
  228. return None
  229. if str(aSocket).find("[closed]") != -1:
  230. closed = True
  231. else:
  232. closed = False
  233. if closed:
  234. return None
  235. # now get the remaining length
  236. multiplier = 1
  237. remlength = 0
  238. while 1:
  239. next = aSocket.recv(1)
  240. while len(next) == 0:
  241. next = aSocket.recv(1)
  242. buf += next
  243. digit = buf[-1]
  244. remlength += (digit & 127) * multiplier
  245. if digit & 128 == 0:
  246. break
  247. multiplier *= 128
  248. # receive the remaining length if there is any
  249. rest = bytes([])
  250. if remlength > 0:
  251. while len(rest) < remlength:
  252. rest += aSocket.recv(remlength-len(rest))
  253. assert len(rest) == remlength
  254. return buf + rest
  255. class FixedHeaders(object):
  256. def __init__(self, aPacketType):
  257. self.PacketType = aPacketType
  258. self.DUP = False
  259. self.QoS = 0
  260. self.RETAIN = False
  261. self.remainingLength = 0
  262. def __eq__(self, fh):
  263. return self.PacketType == fh.PacketType and \
  264. self.DUP == fh.DUP and \
  265. self.QoS == fh.QoS and \
  266. self.RETAIN == fh.RETAIN # and \
  267. # self.remainingLength == fh.remainingLength
  268. def __setattr__(self, name, value):
  269. names = ["PacketType", "DUP", "QoS", "RETAIN", "remainingLength"]
  270. if name not in names:
  271. raise MQTTException(name + " Attribute name must be one of "+str(names))
  272. object.__setattr__(self, name, value)
  273. def __str__(self):
  274. "return printable representation of our data"
  275. return Packets.classNames[self.PacketType]+'(fh.DUP='+str(self.DUP)+ \
  276. ", fh.QoS="+str(self.QoS)+", fh.RETAIN="+str(self.RETAIN)
  277. def pack(self, length):
  278. "pack data into string buffer ready for transmission down socket"
  279. buffer = bytes([(self.PacketType << 4) | (self.DUP << 3) |\
  280. (self.QoS << 1) | self.RETAIN])
  281. self.remainingLength = length
  282. buffer += VBIs.encode(length)
  283. return buffer
  284. def unpack(self, buffer, maximumPacketSize):
  285. "unpack data from string buffer into separate fields"
  286. b0 = buffer[0]
  287. self.PacketType = b0 >> 4
  288. self.DUP = ((b0 >> 3) & 0x01) == 1
  289. self.QoS = (b0 >> 1) & 0x03
  290. self.RETAIN = (b0 & 0x01) == 1
  291. (self.remainingLength, bytes) = VBIs.decode(buffer[1:])
  292. if self.remainingLength + bytes + 1 > maximumPacketSize:
  293. raise ProtocolError("Packet too large")
  294. return bytes + 1 # length of fixed header
  295. def writeInt16(length):
  296. return bytes([length // 256, length % 256])
  297. def readInt16(buf):
  298. return buf[0]*256 + buf[1]
  299. def writeInt32(length):
  300. buffer = [length // 16777216]
  301. length %= 16777216
  302. buffer += [length // 65536]
  303. length %= 65536
  304. buffer += [length // 256, length % 256]
  305. return bytes(buffer)
  306. def readInt32(buf):
  307. return buf[0]*16777216 + buf[1]*65536 + buf[2]*256 + buf[3]
  308. def writeUTF(data):
  309. # data could be a string, or bytes. If string, encode into bytes with utf-8
  310. return writeInt16(len(data)) + (data if type(data) == type(b"") else bytes(data, "utf-8"))
  311. def readUTF(buffer, maxlen):
  312. if maxlen >= 2:
  313. length = readInt16(buffer)
  314. else:
  315. raise MalformedPacket("Not enough data to read string length")
  316. maxlen -= 2
  317. if length > maxlen:
  318. raise MalformedPacket("Length delimited string too long")
  319. buf = buffer[2:2+length].decode("utf-8")
  320. logger.info("[MQTT-4.7.3-2] topic names and filters must not include null")
  321. zz = buf.find("\x00") # look for null in the UTF string
  322. if zz != -1:
  323. raise MalformedPacket("[MQTT-1.5.4-2] Null found in UTF data "+buf)
  324. for c in range (0xD800, 0xDFFF):
  325. zz = buf.find(chr(c)) # look for D800-DFFF in the UTF string
  326. if zz != -1:
  327. raise MalformedPacket("[MQTT-1.5.4-1] D800-DFFF found in UTF data "+buf)
  328. if buf.find("\uFEFF") != -1:
  329. logger.info("[MQTT-1.5.4-3] U+FEFF in UTF string")
  330. return buf, length+2
  331. def writeBytes(buffer):
  332. return writeInt16(len(buffer)) + buffer
  333. def readBytes(buffer):
  334. length = readInt16(buffer)
  335. return buffer[2:2+length], length+2
  336. class Properties(object):
  337. def __init__(self, packetType):
  338. self.packetType = packetType
  339. self.types = ["Byte", "Two Byte Integer", "Four Byte Integer", "Variable Byte Integer",
  340. "Binary Data", "UTF-8 Encoded String", "UTF-8 String Pair"]
  341. self.names = {
  342. "Payload Format Indicator" : 1,
  343. "Message Expiry Interval" : 2,
  344. "Content Type" : 3,
  345. "Response Topic" : 8,
  346. "Correlation Data" : 9,
  347. "Subscription Identifier" : 11,
  348. "Session Expiry Interval" : 17,
  349. "Assigned Client Identifier" : 18,
  350. "Server Keep Alive" : 19,
  351. "Authentication Method" : 21,
  352. "Authentication Data" : 22,
  353. "Request Problem Information" : 23,
  354. "Will Delay Interval" : 24,
  355. "Request Response Information" : 25,
  356. "Response Information" : 26,
  357. "Server Reference" : 28,
  358. "Reason String" : 31,
  359. "Receive Maximum" : 33,
  360. "Topic Alias Maximum" : 34,
  361. "Topic Alias" : 35,
  362. "Maximum QoS" : 36,
  363. "Retain Available" : 37,
  364. "User Property List" : 38,
  365. "Maximum Packet Size" : 39,
  366. "Wildcard Subscription Available" : 40,
  367. "Subscription Identifier Available" : 41,
  368. "Shared Subscription Available" : 42
  369. }
  370. self.properties = {
  371. # id: type, packets
  372. 1 : (self.types.index("Byte"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]), # payload format indicator
  373. 2 : (self.types.index("Four Byte Integer"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
  374. 3 : (self.types.index("UTF-8 Encoded String"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
  375. 8 : (self.types.index("UTF-8 Encoded String"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
  376. 9 : (self.types.index("Binary Data"), [PacketTypes.PUBLISH, PacketTypes.WILLMESSAGE]),
  377. 11 : (self.types.index("Variable Byte Integer"),
  378. [PacketTypes.PUBLISH, PacketTypes.SUBSCRIBE]),
  379. 17 : (self.types.index("Four Byte Integer"),
  380. [PacketTypes.CONNECT, PacketTypes.CONNACK, PacketTypes.DISCONNECT]),
  381. 18 : (self.types.index("UTF-8 Encoded String"), [PacketTypes.CONNACK]),
  382. 19 : (self.types.index("Two Byte Integer"), [PacketTypes.CONNACK]),
  383. 21 : (self.types.index("UTF-8 Encoded String"),
  384. [PacketTypes.CONNECT, PacketTypes.CONNACK, PacketTypes.AUTH]),
  385. 22 : (self.types.index("Binary Data"),
  386. [PacketTypes.CONNECT, PacketTypes.CONNACK, PacketTypes.AUTH]),
  387. 23 : (self.types.index("Byte"),
  388. [PacketTypes.CONNECT]),
  389. 24 : (self.types.index("Four Byte Integer"), [PacketTypes.WILLMESSAGE]),
  390. 25 : (self.types.index("Byte"), [PacketTypes.CONNECT]),
  391. 26 : (self.types.index("UTF-8 Encoded String"), [PacketTypes.CONNACK]),
  392. 28 : (self.types.index("UTF-8 Encoded String"),
  393. [PacketTypes.CONNACK, PacketTypes.DISCONNECT]),
  394. 31 : (self.types.index("UTF-8 Encoded String"),
  395. [PacketTypes.CONNACK, PacketTypes.PUBACK, PacketTypes.PUBREC,
  396. PacketTypes.PUBREL, PacketTypes.PUBCOMP, PacketTypes.SUBACK,
  397. PacketTypes.UNSUBACK, PacketTypes.DISCONNECT, PacketTypes.AUTH]),
  398. 33 : (self.types.index("Two Byte Integer"),
  399. [PacketTypes.CONNECT, PacketTypes.CONNACK]),
  400. 34 : (self.types.index("Two Byte Integer"),
  401. [PacketTypes.CONNECT, PacketTypes.CONNACK]),
  402. 35 : (self.types.index("Two Byte Integer"), [PacketTypes.PUBLISH]),
  403. 36 : (self.types.index("Byte"), [PacketTypes.CONNACK]),
  404. 37 : (self.types.index("Byte"), [PacketTypes.CONNACK]),
  405. 38 : (self.types.index("UTF-8 String Pair"),
  406. [PacketTypes.CONNECT, PacketTypes.CONNACK,
  407. PacketTypes.PUBLISH, PacketTypes.PUBACK,
  408. PacketTypes.PUBREC, PacketTypes.PUBREL, PacketTypes.PUBCOMP,
  409. PacketTypes.SUBSCRIBE, PacketTypes.SUBACK,
  410. PacketTypes.UNSUBSCRIBE, PacketTypes.UNSUBACK,
  411. PacketTypes.DISCONNECT, PacketTypes.AUTH, PacketTypes.WILLMESSAGE]),
  412. 39 : (self.types.index("Four Byte Integer"),
  413. [PacketTypes.CONNECT, PacketTypes.CONNACK]),
  414. 40 : (self.types.index("Byte"), [PacketTypes.CONNACK]),
  415. 41 : (self.types.index("Byte"), [PacketTypes.CONNACK]),
  416. 42 : (self.types.index("Byte"), [PacketTypes.CONNACK]),
  417. }
  418. def getIdentFromName(self, compressedName):
  419. # return the identifier corresponding to the property name
  420. result = -1
  421. for name in self.names.keys():
  422. if compressedName == name.replace(' ', ''):
  423. result = self.names[name]
  424. break
  425. return result
  426. def __setattr__(self, name, value):
  427. name = name.replace(' ', '')
  428. privateVars = ["packetType", "types", "names", "properties"]
  429. if name in privateVars:
  430. object.__setattr__(self, name, value)
  431. else:
  432. # the name could have spaces in, or not. Remove spaces before assignment
  433. if name not in [name.replace(' ', '') for name in self.names.keys()]:
  434. raise MQTTException("Attribute name must be one of "+str(self.names.keys()))
  435. # check that this attribute applies to the packet type
  436. if self.packetType not in self.properties[self.getIdentFromName(name)][1]:
  437. raise MQTTException("Attribute %s does not apply to packet type %s"
  438. % (name, Packets.Names[self.packetType]) )
  439. object.__setattr__(self, name, value)
  440. def __str__(self):
  441. buffer = "["
  442. first = True
  443. for name in self.names.keys():
  444. compressedName = name.replace(' ', '')
  445. if hasattr(self, compressedName):
  446. if not first:
  447. buffer += ", "
  448. buffer += compressedName +" : "+str(getattr(self, compressedName))
  449. first = False
  450. buffer += "]"
  451. return buffer
  452. def isEmpty(self):
  453. rc = True
  454. for name in self.names.keys():
  455. compressedName = name.replace(' ', '')
  456. if hasattr(self, compressedName):
  457. rc = False
  458. break
  459. return rc
  460. def clear(self):
  461. for name in self.names.keys():
  462. compressedName = name.replace(' ', '')
  463. if hasattr(self, compressedName):
  464. delattr(self, compressedName)
  465. def writeProperty(self, identifier, type, value):
  466. buffer = b""
  467. buffer += VBIs.encode(identifier) # identifier
  468. if type == self.types.index("Byte"): # value
  469. buffer += bytes([value])
  470. elif type == self.types.index("Two Byte Integer"):
  471. buffer += writeInt16(value)
  472. elif type == self.types.index("Four Byte Integer"):
  473. buffer += writeInt32(value)
  474. elif type == self.types.index("Variable Byte Integer"):
  475. buffer += VBIs.encode(value)
  476. elif type == self.types.index("Binary Data"):
  477. buffer += writeBytes(value)
  478. elif type == self.types.index("UTF-8 Encoded String"):
  479. buffer += writeUTF(value)
  480. elif type == self.types.index("UTF-8 String Pair"):
  481. buffer += writeUTF(value[0]) + writeUTF(value[1])
  482. return buffer
  483. def pack(self):
  484. # serialize properties into buffer for sending over network
  485. buffer = b""
  486. for name in self.names.keys():
  487. compressedName = name.replace(' ', '')
  488. isList = False
  489. if compressedName.endswith('List'):
  490. isList = True
  491. if hasattr(self, compressedName):
  492. identifier = self.getIdentFromName(compressedName)
  493. attr_type = self.properties[identifier][0]
  494. if isList:
  495. for prop in getattr(self, compressedName):
  496. buffer += self.writeProperty(identifier, attr_type, prop)
  497. else:
  498. buffer += self.writeProperty(identifier, attr_type,
  499. getattr(self, compressedName))
  500. return VBIs.encode(len(buffer)) + buffer
  501. def readProperty(self, buffer, type, propslen):
  502. if type == self.types.index("Byte"):
  503. value = buffer[0]
  504. valuelen = 1
  505. elif type == self.types.index("Two Byte Integer"):
  506. value = readInt16(buffer)
  507. valuelen = 2
  508. elif type == self.types.index("Four Byte Integer"):
  509. value = readInt32(buffer)
  510. valuelen = 4
  511. elif type == self.types.index("Variable Byte Integer"):
  512. value, valuelen = VBIs.decode(buffer)
  513. elif type == self.types.index("Binary Data"):
  514. value, valuelen = readBytes(buffer)
  515. elif type == self.types.index("UTF-8 Encoded String"):
  516. value, valuelen = readUTF(buffer, propslen)
  517. elif type == self.types.index("UTF-8 String Pair"):
  518. value, valuelen = readUTF(buffer, propslen)
  519. buffer = buffer[valuelen:] # strip the bytes used by the value
  520. value1, valuelen1 = readUTF(buffer, propslen - valuelen)
  521. value = (value, value1)
  522. valuelen += valuelen1
  523. return value, valuelen
  524. def getNameFromIdent(self, identifier):
  525. rc = None
  526. for name in self.names:
  527. if self.names[name] == identifier:
  528. rc = name
  529. return rc
  530. def unpack(self, buffer):
  531. self.clear()
  532. # deserialize properties into attributes from buffer received from network
  533. propslen, VBIlen = VBIs.decode(buffer)
  534. buffer = buffer[VBIlen:] # strip the bytes used by the VBI
  535. propslenleft = propslen
  536. while propslenleft > 0: # properties length is 0 if there are none
  537. identifier, VBIlen = VBIs.decode(buffer) # property identifier
  538. buffer = buffer[VBIlen:] # strip the bytes used by the VBI
  539. propslenleft -= VBIlen
  540. attr_type = self.properties[identifier][0]
  541. value, valuelen = self.readProperty(buffer, attr_type, propslenleft)
  542. buffer = buffer[valuelen:] # strip the bytes used by the value
  543. propslenleft -= valuelen
  544. propname = self.getNameFromIdent(identifier)
  545. compressedName = propname.replace(' ', '')
  546. if propname.endswith('List'):
  547. if not hasattr(self, compressedName):
  548. setattr(self, propname, [value])
  549. else:
  550. setattr(self, propname, getattr(self, compressedName) + [value])
  551. else:
  552. if hasattr(self, compressedName):
  553. raise MQTTException("Property '%s' must not exist more than once" % property)
  554. setattr(self, propname, value)
  555. return self, propslen + VBIlen
  556. class Connects(Packets):
  557. def __init__(self, buffer = None):
  558. object.__setattr__(self, "names",
  559. ["fh", "properties", "willProperties", "ProtocolName", "ProtocolVersion",
  560. "ClientIdentifier", "CleanStart", "KeepAliveTimer",
  561. "WillFlag", "WillQoS", "WillRETAIN", "WillTopic", "WillMessage",
  562. "usernameFlag", "passwordFlag", "username", "password"])
  563. self.fh = FixedHeaders(PacketTypes.CONNECT)
  564. # variable header
  565. self.ProtocolName = "MQTT"
  566. self.ProtocolVersion = 5
  567. self.CleanStart = True
  568. self.WillFlag = False
  569. self.WillQoS = 0
  570. self.WillRETAIN = 0
  571. self.KeepAliveTimer = 30
  572. self.usernameFlag = False
  573. self.passwordFlag = False
  574. self.properties = Properties(PacketTypes.CONNECT)
  575. self.willProperties = Properties(PacketTypes.WILLMESSAGE)
  576. # Payload
  577. self.ClientIdentifier = "" # UTF-8
  578. self.WillTopic = None # UTF-8
  579. self.WillMessage = None # binary
  580. self.username = None # UTF-8
  581. self.password = None # binary
  582. #self.properties = Properties()
  583. if buffer != None:
  584. self.unpack(buffer)
  585. def pack(self):
  586. connectFlags = bytes([(self.CleanStart << 1) | (self.WillFlag << 2) | \
  587. (self.WillQoS << 3) | (self.WillRETAIN << 5) | \
  588. (self.usernameFlag << 6) | (self.passwordFlag << 7)])
  589. buffer = writeUTF(self.ProtocolName) + bytes([self.ProtocolVersion]) + \
  590. connectFlags + writeInt16(self.KeepAliveTimer)
  591. buffer += self.properties.pack()
  592. buffer += writeUTF(self.ClientIdentifier)
  593. if self.WillFlag:
  594. assert self.willProperties.packetType == PacketTypes.WILLMESSAGE
  595. buffer += self.willProperties.pack()
  596. buffer += writeUTF(self.WillTopic)
  597. buffer += writeBytes(self.WillMessage)
  598. if self.usernameFlag:
  599. buffer += writeUTF(self.username)
  600. if self.passwordFlag:
  601. buffer += writeBytes(self.password)
  602. buffer = self.fh.pack(len(buffer)) + buffer
  603. return buffer
  604. def unpack(self, buffer, maximumPacketSize):
  605. assert len(buffer) >= 2
  606. assert PacketType(buffer) == PacketTypes.CONNECT
  607. try:
  608. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  609. packlen = fhlen + self.fh.remainingLength
  610. assert len(buffer) >= packlen, "buffer length %d packet length %d" % (len(buffer), packlen)
  611. curlen = fhlen # points to after header + remaining length
  612. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  613. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS was not 0, was %d" % self.fh.QoS
  614. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  615. # to allow the server to send back a CONNACK with unsupported protocol version,
  616. # the following two assertions will need to be disabled
  617. self.ProtocolName, valuelen = readUTF(buffer[curlen:], packlen - curlen)
  618. curlen += valuelen
  619. assert self.ProtocolName == "MQTT", "Wrong protocol name %s" % self.ProtocolName
  620. self.ProtocolVersion = buffer[curlen]
  621. curlen += 1
  622. assert self.ProtocolVersion == 5, "Wrong protocol version %s" % self.ProtocolVersion
  623. connectFlags = buffer[curlen]
  624. assert (connectFlags & 0x01) == 0, "[MQTT-3.1.2-3] reserved connect flag must be 0"
  625. self.CleanStart = ((connectFlags >> 1) & 0x01) == 1
  626. self.WillFlag = ((connectFlags >> 2) & 0x01) == 1
  627. self.WillQoS = (connectFlags >> 3) & 0x03
  628. self.WillRETAIN = (connectFlags >> 5) & 0x01
  629. self.passwordFlag = ((connectFlags >> 6) & 0x01) == 1
  630. self.usernameFlag = ((connectFlags >> 7) & 0x01) == 1
  631. curlen += 1
  632. if self.WillFlag:
  633. assert self.WillQoS in [0, 1, 2], "[MQTT-3.1.2-12] will qos must not be 3"
  634. else:
  635. assert self.WillQoS == 0, "[MQTT-3.1.2-11] will qos must be 0, if will flag is false"
  636. assert self.WillRETAIN == False, "[MQTT-3.1.2-13] will retain must be false, if will flag is false"
  637. self.KeepAliveTimer = readInt16(buffer[curlen:])
  638. curlen += 2
  639. curlen += self.properties.unpack(buffer[curlen:])[1]
  640. logger.info("[MQTT-3.1.3-3] Clientid must be present, and first field")
  641. logger.info("[MQTT-3.1.3-4] Clientid must be Unicode, and between 0 and 65535 bytes long")
  642. self.ClientIdentifier, valuelen = readUTF(buffer[curlen:], packlen - curlen)
  643. curlen += valuelen
  644. if self.WillFlag:
  645. curlen += self.willProperties.unpack(buffer[curlen:])[1]
  646. self.WillTopic, valuelen = readUTF(buffer[curlen:], packlen - curlen)
  647. curlen += valuelen
  648. self.WillMessage, valuelen = readBytes(buffer[curlen:])
  649. curlen += valuelen
  650. logger.info("[[MQTT-3.1.2-9] will topic and will message fields must be present")
  651. else:
  652. self.WillTopic = self.WillMessage = None
  653. if self.usernameFlag:
  654. assert len(buffer) > curlen+2, "Buffer too short to read username length"
  655. self.username, valuelen = readUTF(buffer[curlen:], packlen - curlen)
  656. curlen += valuelen
  657. logger.info("[MQTT-3.1.2-19] username must be in payload if user name flag is 1")
  658. else:
  659. logger.info("[MQTT-3.1.2-18] username must not be in payload if user name flag is 0")
  660. assert self.passwordFlag == False, "[MQTT-3.1.2-22] password flag must be 0 if username flag is 0"
  661. if self.passwordFlag:
  662. assert len(buffer) > curlen+2, "Buffer too short to read password length"
  663. self.password, valuelen = readBytes(buffer[curlen:])
  664. curlen += valuelen
  665. logger.info("[MQTT-3.1.2-21] password must be in payload if password flag is 0")
  666. else:
  667. logger.info("[MQTT-3.1.2-20] password must not be in payload if password flag is 0")
  668. if self.WillFlag and self.usernameFlag and self.passwordFlag:
  669. logger.info("[MQTT-3.1.3-1] clientid, will topic, will message, username and password all present")
  670. assert curlen == packlen, "Packet is wrong length curlen %d != packlen %d" % (curlen, packlen)
  671. except:
  672. logger.exception("[MQTT-3.1.4-1] server must validate connect packet and close connection without connack if it does not conform")
  673. raise
  674. def __str__(self):
  675. buf = str(self.fh)+", ProtocolName="+str(self.ProtocolName)+", ProtocolVersion=" +\
  676. str(self.ProtocolVersion)+", CleanStart="+str(self.CleanStart) +\
  677. ", WillFlag="+str(self.WillFlag)+", KeepAliveTimer=" +\
  678. str(self.KeepAliveTimer)+", ClientId="+str(self.ClientIdentifier) +\
  679. ", usernameFlag="+str(self.usernameFlag)+", passwordFlag="+str(self.passwordFlag)
  680. if self.WillFlag:
  681. buf += ", WillQoS=" + str(self.WillQoS) +\
  682. ", WillRETAIN=" + str(self.WillRETAIN) +\
  683. ", WillTopic='"+ self.WillTopic +\
  684. "', WillMessage='"+str(self.WillMessage)+"'"
  685. if self.username:
  686. buf += ", username="+self.username
  687. if self.password:
  688. buf += ", password="+str(self.password)
  689. buf += ", properties="+str(self.properties)
  690. return buf+")"
  691. def __eq__(self, packet):
  692. rc = Packets.__eq__(self, packet) and \
  693. self.ProtocolName == packet.ProtocolName and \
  694. self.ProtocolVersion == packet.ProtocolVersion and \
  695. self.CleanStart == packet.CleanStart and \
  696. self.WillFlag == packet.WillFlag and \
  697. self.KeepAliveTimer == packet.KeepAliveTimer and \
  698. self.ClientIdentifier == packet.ClientIdentifier and \
  699. self.WillFlag == packet.WillFlag
  700. if rc and self.WillFlag:
  701. rc = self.WillQoS == packet.WillQoS and \
  702. self.WillRETAIN == packet.WillRETAIN and \
  703. self.WillTopic == packet.WillTopic and \
  704. self.WillMessage == packet.WillMessage
  705. if rc:
  706. rc = self.properties == packet.properties
  707. return rc
  708. class Connacks(Packets):
  709. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False, ReasonCode="Success"):
  710. object.__setattr__(self, "names",
  711. ["fh", "sessionPresent", "reasonCode", "properties"])
  712. self.fh = FixedHeaders(PacketTypes.CONNACK)
  713. self.fh.DUP = DUP
  714. self.fh.QoS = QoS
  715. self.fh.RETAIN = RETAIN
  716. self.sessionPresent = False
  717. self.reasonCode = ReasonCodes(PacketTypes.CONNACK, ReasonCode)
  718. self.properties = Properties(PacketTypes.CONNACK)
  719. if buffer != None:
  720. self.unpack(buffer)
  721. def pack(self):
  722. flags = 0x01 if self.sessionPresent else 0x00
  723. buffer = bytes([flags])
  724. buffer += self.reasonCode.pack()
  725. buffer += self.properties.pack()
  726. buffer = self.fh.pack(len(buffer)) + buffer
  727. return buffer
  728. def unpack(self, buffer, maximumPacketSize):
  729. assert len(buffer) >= 4
  730. assert PacketType(buffer) == PacketTypes.CONNACK
  731. curlen = self.fh.unpack(buffer, maximumPacketSize)
  732. assert buffer[curlen] in [0, 1], "Connect Acknowledge Flags"
  733. self.sessionPresent = (buffer[curlen] == 0x01)
  734. curlen += 1
  735. curlen += self.reasonCode.unpack(buffer[curlen:])
  736. curlen += self.properties.unpack(buffer[curlen:])[1]
  737. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  738. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  739. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  740. def __str__(self):
  741. return str(self.fh)+", Session present="+str((self.sessionPresent & 0x01) == 1)+\
  742. ", ReturnCode="+str(self.reasonCode)+\
  743. ", properties="+str(self.properties)+")"
  744. def __eq__(self, packet):
  745. return Packets.__eq__(self, packet) and \
  746. self.reasonCode == packet.reasonCode
  747. class Disconnects(Packets):
  748. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False,
  749. reasonCode="Normal disconnection"):
  750. object.__setattr__(self, "names",
  751. ["fh", "DUP", "QoS", "RETAIN", "reasonCode", "properties"])
  752. self.fh = FixedHeaders(PacketTypes.DISCONNECT)
  753. self.fh.DUP = DUP
  754. self.fh.QoS = QoS
  755. self.fh.RETAIN = RETAIN
  756. # variable header
  757. self.reasonCode = ReasonCodes(PacketTypes.DISCONNECT, identifier=reasonCode)
  758. self.properties = Properties(PacketTypes.DISCONNECT)
  759. if buffer != None:
  760. self.unpack(buffer)
  761. def pack(self):
  762. buffer = b""
  763. if self.reasonCode.getName() != "Normal disconnection" or not self.properties.isEmpty():
  764. buffer += self.reasonCode.pack()
  765. if not self.properties.isEmpty():
  766. buffer += self.properties.pack()
  767. buffer = self.fh.pack(len(buffer)) + buffer
  768. return buffer
  769. def unpack(self, buffer, maximumPacketSize):
  770. self.properties.clear()
  771. self.reasonCode.set("Normal disconnection")
  772. assert len(buffer) >= 2
  773. assert PacketType(buffer) == PacketTypes.DISCONNECT
  774. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  775. assert len(buffer) >= fhlen + self.fh.remainingLength
  776. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DISCONNECT reserved bits must be 0"
  777. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] DISCONNECT reserved bits must be 0"
  778. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] DISCONNECT reserved bits must be 0"
  779. curlen = 0
  780. if self.fh.remainingLength > 0:
  781. self.reasonCode.unpack(buffer[curlen:])
  782. curlen += 1
  783. if self.fh.remainingLength > 1:
  784. curlen += self.properties.unpack(buffer[curlen:])[1]
  785. assert curlen == self.fh.remainingLength, \
  786. "DISCONNECT packet is wrong length %d" % self.fh.remainingLength
  787. return fhlen + self.fh.remainingLength
  788. def __str__(self):
  789. return str(self.fh)+", ReasonCode: "+str(self.reasonCode)+", Properties: "+str(self.properties)
  790. def __eq__(self, packet):
  791. return Packets.__eq__(self, packet) and \
  792. self.reasonCode == packet.reasonCode and \
  793. self.properties == packet.properties
  794. class Publishes(Packets):
  795. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False, MsgId=1, TopicName="", Payload=b""):
  796. object.__setattr__(self, "names",
  797. ["fh", "DUP", "QoS", "RETAIN", "topicName", "packetIdentifier",
  798. "properties", "data", "qos2state", "receivedTime"])
  799. self.fh = FixedHeaders(PacketTypes.PUBLISH)
  800. self.fh.DUP = DUP
  801. self.fh.QoS = QoS
  802. self.fh.RETAIN = RETAIN
  803. # variable header
  804. self.topicName = TopicName
  805. self.packetIdentifier = MsgId
  806. self.properties = Properties(PacketTypes.PUBLISH)
  807. # payload
  808. self.data = Payload
  809. if buffer != None:
  810. self.unpack(buffer)
  811. def pack(self):
  812. buffer = writeUTF(self.topicName)
  813. if self.fh.QoS != 0:
  814. buffer += writeInt16(self.packetIdentifier)
  815. buffer += self.properties.pack()
  816. buffer += self.data
  817. buffer = self.fh.pack(len(buffer)) + buffer
  818. return buffer
  819. def unpack(self, buffer, maximumPacketSize):
  820. assert len(buffer) >= 2
  821. assert PacketType(buffer) == PacketTypes.PUBLISH
  822. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  823. assert self.fh.QoS in [0, 1, 2], "QoS in Publish must be 0, 1, or 2"
  824. packlen = fhlen + self.fh.remainingLength
  825. assert len(buffer) >= packlen
  826. curlen = fhlen
  827. try:
  828. self.topicName, valuelen = readUTF(buffer[fhlen:], packlen - curlen)
  829. except UnicodeDecodeError:
  830. logger.info("[MQTT-3.3.2-1] topic name in publish must be utf-8")
  831. raise
  832. curlen += valuelen
  833. if self.fh.QoS != 0:
  834. self.packetIdentifier = readInt16(buffer[curlen:])
  835. logger.info("[MQTT-2.3.1-1] packet indentifier must be in publish if QoS is 1 or 2")
  836. curlen += 2
  837. assert self.packetIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  838. else:
  839. logger.info("[MQTT-2.3.1-5] no packet indentifier in publish if QoS is 0")
  840. self.packetIdentifier = 0
  841. curlen += self.properties.unpack(buffer[curlen:])[1]
  842. self.data = buffer[curlen:fhlen + self.fh.remainingLength]
  843. if self.fh.QoS == 0:
  844. assert self.fh.DUP == False, "[MQTT-2.1.2-4]"
  845. return fhlen + self.fh.remainingLength
  846. def __str__(self):
  847. rc = str(self.fh)
  848. if self.fh.QoS != 0:
  849. rc += ", PacketId="+str(self.packetIdentifier)
  850. rc += ", Properties: "+str(self.properties)
  851. rc += ", TopicName="+str(self.topicName)+", Payload="+str(self.data)+")"
  852. return rc
  853. def __eq__(self, packet):
  854. rc = Packets.__eq__(self, packet) and \
  855. self.topicName == packet.topicName and \
  856. self.data == packet.data
  857. if rc and self.fh.QoS != 0:
  858. rc = self.packetIdentifier == packet.packetIdentifier
  859. return rc
  860. class Acks(Packets):
  861. def __init__(self, ackType, buffer, DUP, QoS, RETAIN, packetId):
  862. object.__setattr__(self, "names",
  863. ["fh", "DUP", "QoS", "RETAIN", "packetIdentifier",
  864. "reasonCode", "properties"])
  865. self.fh = FixedHeaders(ackType)
  866. self.fh.DUP = DUP
  867. self.fh.QoS = QoS
  868. self.fh.RETAIN = RETAIN
  869. # variable header
  870. self.packetIdentifier = packetId
  871. self.reasonCode = ReasonCodes(ackType)
  872. self.properties = Properties(ackType)
  873. object.__setattr__(self, "ackType", ackType)
  874. object.__setattr__(self, "ackName", Packets.Names[self.ackType])
  875. if buffer != None:
  876. self.unpack(buffer)
  877. def pack(self):
  878. buffer = writeInt16(self.packetIdentifier)
  879. if self.reasonCode.getName() != "Success" or not self.properties.isEmpty():
  880. buffer += self.reasonCode.pack()
  881. if not self.properties.isEmpty():
  882. buffer += self.properties.pack()
  883. buffer = self.fh.pack(len(buffer)) + buffer
  884. return buffer
  885. def unpack(self, buffer, maximumPacketSize):
  886. self.properties.clear()
  887. self.reasonCode.set("Success")
  888. assert len(buffer) >= 2
  889. assert PacketType(buffer) == self.ackType
  890. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  891. assert self.fh.remainingLength in [2, 3, 4], \
  892. "%s packet is wrong length %d" % (self.ackName, self.fh.remainingLength)
  893. assert len(buffer) >= fhlen + self.fh.remainingLength
  894. self.packetIdentifier = readInt16(buffer[fhlen:])
  895. curlen = fhlen + 2
  896. assert self.fh.DUP == False, "[MQTT-2.1.2-1] %s reserved bits must be 0" %\
  897. self.ackName
  898. if self.ackType == PacketTypes.PUBREL:
  899. assert self.fh.QoS == 1, "[MQTT-3.6.1-1] %s reserved bits must be 0010" %\
  900. self.ackName
  901. else:
  902. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] %s reserved bits must be 0" %\
  903. self.ackName
  904. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] %s reserved bits must be 0" %\
  905. self.ackName
  906. if self.fh.remainingLength > 2:
  907. self.reasonCode.unpack(buffer[curlen:])
  908. curlen += 1
  909. if self.fh.remainingLength > 3:
  910. self.properties.unpack(buffer[curlen:])
  911. return fhlen + self.fh.remainingLength
  912. def __str__(self):
  913. return str(self.fh)+", PacketId="+str(self.packetIdentifier)+")"
  914. def __eq__(self, packet):
  915. return Packets.__eq__(self, packet) and \
  916. self.packetIdentifier == packet.packetIdentifier
  917. class Pubacks(Acks):
  918. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False, PacketId=1):
  919. Acks.__init__(self, PacketTypes.PUBACK, buffer, DUP, QoS, RETAIN, PacketId)
  920. class Pubrecs(Acks):
  921. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False, PacketId=1):
  922. Acks.__init__(self, PacketTypes.PUBREC, buffer, DUP, QoS, RETAIN, PacketId)
  923. class Pubrels(Acks):
  924. def __init__(self, buffer=None, DUP=False, QoS=1, RETAIN=False, PacketId=1):
  925. Acks.__init__(self, PacketTypes.PUBREL, buffer, DUP, QoS, RETAIN, PacketId)
  926. class Pubcomps(Acks):
  927. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False, PacketId=1):
  928. Acks.__init__(self, PacketTypes.PUBCOMP, buffer, DUP, QoS, RETAIN, PacketId)
  929. class SubscribeOptions(object):
  930. def __init__(self, QoS=0, noLocal=False, retainAsPublished=False, retainHandling=0):
  931. object.__setattr__(self, "names",
  932. ["QoS", "noLocal", "retainAsPublished", "retainHandling"])
  933. self.QoS = QoS # bits 0,1
  934. self.noLocal = noLocal # bit 2
  935. self.retainAsPublished = retainAsPublished # bit 3
  936. self.retainHandling = retainHandling # bits 4 and 5: 0, 1 or 2
  937. def __setattr__(self, name, value):
  938. if name not in self.names:
  939. raise MQTTException(name + " Attribute name must be one of "+str(self.names))
  940. object.__setattr__(self, name, value)
  941. def pack(self):
  942. assert self.QoS in [0, 1, 2]
  943. assert self.retainHandling in [0, 1, 2]
  944. noLocal = 1 if self.noLocal else 0
  945. retainAsPublished = 1 if self.retainAsPublished else 0
  946. buffer = bytes([(self.retainHandling << 4) | (retainAsPublished << 3) |\
  947. (noLocal << 2) | self.QoS])
  948. return buffer
  949. def unpack(self, buffer):
  950. b0 = buffer[0]
  951. self.retainHandling = ((b0 >> 4) & 0x03)
  952. self.retainAsPublished = True if ((b0 >> 3) & 0x01) == 1 else False
  953. self.noLocal = True if ((b0 >> 2) & 0x01) == 1 else False
  954. self.QoS = (b0 & 0x03)
  955. assert self.retainHandling in [0, 1, 2]
  956. assert self.QoS in [0, 1, 2]
  957. return 1
  958. def __str__(self):
  959. return "{QoS="+str(self.QoS)+", noLocal="+str(self.noLocal)+\
  960. ", retainAsPublished="+str(self.retainAsPublished)+\
  961. ", retainHandling="+str(self.retainHandling)+"}"
  962. class Subscribes(Packets):
  963. def __init__(self, buffer=None, DUP=False, QoS=1, RETAIN=False, MsgId=1, Data=[]):
  964. object.__setattr__(self, "names",
  965. ["fh", "DUP", "QoS", "RETAIN", "packetIdentifier",
  966. "properties", "data"])
  967. self.fh = FixedHeaders(PacketTypes.SUBSCRIBE)
  968. self.fh.DUP = DUP
  969. self.fh.QoS = QoS
  970. self.fh.RETAIN = RETAIN
  971. # variable header
  972. self.packetIdentifier = MsgId
  973. self.properties = Properties(PacketTypes.SUBSCRIBE)
  974. # payload - list of topic, subscribe option pairs
  975. self.data = Data[:]
  976. if buffer != None:
  977. self.unpack(buffer)
  978. def pack(self):
  979. buffer = writeInt16(self.packetIdentifier)
  980. buffer += self.properties.pack()
  981. for d in self.data:
  982. buffer += writeUTF(d[0]) + d[1].pack()
  983. buffer = self.fh.pack(len(buffer)) + buffer
  984. return buffer
  985. def unpack(self, buffer, maximumPacketSize):
  986. self.properties.clear()
  987. assert len(buffer) >= 2
  988. assert PacketType(buffer) == PacketTypes.SUBSCRIBE
  989. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  990. assert len(buffer) >= fhlen + self.fh.remainingLength
  991. logger.info("[MQTT-2.3.1-1] packet indentifier must be in subscribe")
  992. self.packetIdentifier = readInt16(buffer[fhlen:])
  993. assert self.packetIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  994. leftlen = self.fh.remainingLength - 2
  995. leftlen -= self.properties.unpack(buffer[-leftlen:])[1]
  996. self.data = []
  997. while leftlen > 0:
  998. topic, topiclen = readUTF(buffer[-leftlen:], leftlen)
  999. leftlen -= topiclen
  1000. options = SubscribeOptions()
  1001. options.unpack(buffer[-leftlen:])
  1002. leftlen -= 1
  1003. self.data.append((topic, options))
  1004. assert len(self.data) > 0, "[MQTT-3.8.3-1] at least one topic, qos pair must be in subscribe"
  1005. assert leftlen == 0
  1006. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP must be false in subscribe"
  1007. assert self.fh.QoS == 1, "[MQTT-2.1.2-1] QoS must be 1 in subscribe"
  1008. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] RETAIN must be false in subscribe"
  1009. return fhlen + self.fh.remainingLength
  1010. def __str__(self):
  1011. return str(self.fh)+", PacketId="+str(self.packetIdentifier)+\
  1012. ", Properties: "+str(self.properties)+\
  1013. ", Data="+str( [(x, str(y)) for (x, y) in self.data] ) +")"
  1014. def __eq__(self, packet):
  1015. return Packets.__eq__(self, packet) and \
  1016. self.packetIdentifier == packet.packetIdentifier and \
  1017. self.data == packet.data
  1018. class UnsubSubacks(Packets):
  1019. def __init__(self, packetType, buffer, DUP, QoS, RETAIN, PacketId, reasonCodes):
  1020. object.__setattr__(self, "names",
  1021. ["fh", "DUP", "QoS", "RETAIN", "packetIdentifier",
  1022. "reasonCodes", "properties"])
  1023. object.__setattr__(self, "packetType", packetType)
  1024. self.fh = FixedHeaders(self.packetType)
  1025. self.fh.DUP = DUP
  1026. self.fh.QoS = QoS
  1027. self.fh.RETAIN = RETAIN
  1028. # variable header
  1029. self.packetIdentifier = PacketId
  1030. self.properties = Properties(self.packetType)
  1031. # payload - list of reason codes corresponding to topics in subscribe
  1032. self.reasonCodes = reasonCodes[:]
  1033. if buffer != None:
  1034. self.unpack(buffer)
  1035. def pack(self):
  1036. buffer = writeInt16(self.packetIdentifier)
  1037. buffer += self.properties.pack()
  1038. for reasonCode in self.reasonCodes:
  1039. buffer += reasonCode.pack()
  1040. buffer = self.fh.pack(len(buffer)) + buffer
  1041. assert len(buffer) >= 3 # must have property field, even if empty
  1042. return buffer
  1043. def unpack(self, buffer, maximumPacketSize):
  1044. assert len(buffer) >= 3
  1045. assert PacketType(buffer) == self.packetType
  1046. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  1047. assert len(buffer) >= fhlen + self.fh.remainingLength
  1048. self.packetIdentifier = readInt16(buffer[fhlen:])
  1049. leftlen = self.fh.remainingLength - 2
  1050. leftlen -= self.properties.unpack(buffer[-leftlen:])[1]
  1051. self.reasonCodes = []
  1052. while leftlen > 0:
  1053. if self.packetType == PacketTypes.SUBACK:
  1054. reasonCode = ReasonCodes(self.packetType, "Granted QoS 0")
  1055. else:
  1056. reasonCode = ReasonCodes(self.packetType, "Success")
  1057. reasonCode.unpack(buffer[-leftlen:])
  1058. assert reasonCode.value in [0, 1, 2, 0x80], "[MQTT-3.9.3-2] return code in QoS must be 0, 1, 2 or 0x80"
  1059. leftlen -= 1
  1060. self.reasonCodes.append(reasonCode)
  1061. assert leftlen == 0
  1062. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DUP should be false in suback"
  1063. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] QoS should be 0 in suback"
  1064. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] Retain should be false in suback"
  1065. return fhlen + self.fh.remainingLength
  1066. def __str__(self):
  1067. return str(self.fh)+", PacketId="+str(self.packetIdentifier)+\
  1068. ", Properties: "+str(self.properties)+\
  1069. ", reason codes="+str([str(rc) for rc in self.reasonCodes])+")"
  1070. def __eq__(self, packet):
  1071. return Packets.__eq__(self, packet) and \
  1072. self.packetIdentifier == packet.packetIdentifier and \
  1073. self.data == packet.data
  1074. class Subacks(UnsubSubacks):
  1075. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False, PacketId=1, reasonCodes=[]):
  1076. UnsubSubacks.__init__(self, PacketTypes.SUBACK, buffer, DUP, QoS, RETAIN, PacketId, reasonCodes)
  1077. class Unsubscribes(Packets):
  1078. def __init__(self, buffer=None, DUP=False, QoS=1, RETAIN=False, PacketId=1, TopicFilters=[]):
  1079. object.__setattr__(self, "names",
  1080. ["fh", "DUP", "QoS", "RETAIN", "packetIdentifier", "properties", "topicFilters"])
  1081. self.fh = FixedHeaders(PacketTypes.UNSUBSCRIBE)
  1082. self.fh.DUP = DUP
  1083. self.fh.QoS = QoS
  1084. self.fh.RETAIN = RETAIN
  1085. # variable header
  1086. self.packetIdentifier = PacketId
  1087. self.properties = Properties(PacketTypes.UNSUBSCRIBE)
  1088. # payload - list of topics
  1089. self.topicFilters = TopicFilters[:]
  1090. if buffer != None:
  1091. self.unpack(buffer)
  1092. def pack(self):
  1093. buffer = writeInt16(self.packetIdentifier)
  1094. buffer += self.properties.pack()
  1095. for topicFilter in self.topicFilters:
  1096. buffer += writeUTF(topicFilter)
  1097. buffer = self.fh.pack(len(buffer)) + buffer
  1098. return buffer
  1099. def unpack(self, buffer, maximumPacketSize):
  1100. assert len(buffer) >= 2
  1101. assert PacketType(buffer) == PacketTypes.UNSUBSCRIBE
  1102. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  1103. assert len(buffer) >= fhlen + self.fh.remainingLength
  1104. logger.info("[MQTT-2.3.1-1] packet indentifier must be in unsubscribe")
  1105. self.packetIdentifier = readInt16(buffer[fhlen:])
  1106. assert self.packetIdentifier > 0, "[MQTT-2.3.1-1] packet indentifier must be > 0"
  1107. leftlen = self.fh.remainingLength - 2
  1108. leftlen -= self.properties.unpack(buffer[-leftlen:])[1]
  1109. self.topicFilters = []
  1110. while leftlen > 0:
  1111. topic, topiclen = readUTF(buffer[-leftlen:], leftlen)
  1112. leftlen -= topiclen
  1113. self.topicFilters.append(topic)
  1114. assert leftlen == 0
  1115. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  1116. assert self.fh.QoS == 1, "[MQTT-2.1.2-1]"
  1117. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  1118. logger.info("[MQTT-3-10.1-1] fixed header bits are 0,0,1,0")
  1119. return fhlen + self.fh.remainingLength
  1120. def __str__(self):
  1121. return str(self.fh)+", PacketId="+str(self.packetIdentifier)+\
  1122. ", Properties: "+str(self.properties)+\
  1123. ", Data="+str(self.topicFilters)+")"
  1124. def __eq__(self, packet):
  1125. return Packets.__eq__(self, packet) and \
  1126. self.packetIdentifier == packet.packetIdentifier and \
  1127. self.topicFilters == packet.topicFilters
  1128. class Unsubacks(UnsubSubacks):
  1129. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False, PacketId=1, reasonCodes=[]):
  1130. UnsubSubacks.__init__(self, PacketTypes.UNSUBACK, buffer, DUP, QoS, RETAIN,
  1131. PacketId, reasonCodes)
  1132. class Pingreqs(Packets):
  1133. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False):
  1134. object.__setattr__(self, "names", ["fh", "DUP", "QoS", "RETAIN"])
  1135. self.fh = FixedHeaders(PacketTypes.PINGREQ)
  1136. self.fh.DUP = DUP
  1137. self.fh.QoS = QoS
  1138. self.fh.RETAIN = RETAIN
  1139. if buffer != None:
  1140. self.unpack(buffer)
  1141. def unpack(self, buffer, maximumPacketSize):
  1142. assert len(buffer) >= 2
  1143. assert PacketType(buffer) == PacketTypes.PINGREQ
  1144. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  1145. assert self.fh.remainingLength == 0
  1146. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  1147. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  1148. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  1149. return fhlen
  1150. def __str__(self):
  1151. return str(self.fh)+")"
  1152. class Pingresps(Packets):
  1153. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False):
  1154. object.__setattr__(self, "names", ["fh", "DUP", "QoS", "RETAIN"])
  1155. self.fh = FixedHeaders(PacketTypes.PINGRESP)
  1156. self.fh.DUP = DUP
  1157. self.fh.QoS = QoS
  1158. self.fh.RETAIN = RETAIN
  1159. if buffer != None:
  1160. self.unpack(buffer)
  1161. def unpack(self, buffer, maximumPacketSize):
  1162. assert len(buffer) >= 2
  1163. assert PacketType(buffer) == PacketTypes.PINGRESP
  1164. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  1165. assert self.fh.remainingLength == 0
  1166. assert self.fh.DUP == False, "[MQTT-2.1.2-1]"
  1167. assert self.fh.QoS == 0, "[MQTT-2.1.2-1]"
  1168. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1]"
  1169. return fhlen
  1170. def __str__(self):
  1171. return str(self.fh)+")"
  1172. class Disconnects(Packets):
  1173. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False,
  1174. reasonCode="Normal disconnection"):
  1175. object.__setattr__(self, "names",
  1176. ["fh", "DUP", "QoS", "RETAIN", "reasonCode", "properties"])
  1177. self.fh = FixedHeaders(PacketTypes.DISCONNECT)
  1178. self.fh.DUP = DUP
  1179. self.fh.QoS = QoS
  1180. self.fh.RETAIN = RETAIN
  1181. # variable header
  1182. self.reasonCode = ReasonCodes(PacketTypes.DISCONNECT, aName=reasonCode)
  1183. self.properties = Properties(PacketTypes.DISCONNECT)
  1184. if buffer != None:
  1185. self.unpack(buffer)
  1186. def pack(self):
  1187. buffer = b""
  1188. if self.reasonCode.getName() != "Normal disconnection" or not self.properties.isEmpty():
  1189. buffer += self.reasonCode.pack()
  1190. if not self.properties.isEmpty():
  1191. buffer += self.properties.pack()
  1192. buffer = self.fh.pack(len(buffer)) + buffer
  1193. return buffer
  1194. def unpack(self, buffer, maximumPacketSize):
  1195. self.properties.clear()
  1196. self.reasonCode.set("Normal disconnection")
  1197. assert len(buffer) >= 2
  1198. assert PacketType(buffer) == PacketTypes.DISCONNECT
  1199. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  1200. assert len(buffer) >= fhlen + self.fh.remainingLength
  1201. assert self.fh.DUP == False, "[MQTT-2.1.2-1] DISCONNECT reserved bits must be 0"
  1202. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] DISCONNECT reserved bits must be 0"
  1203. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] DISCONNECT reserved bits must be 0"
  1204. curlen = fhlen
  1205. if self.fh.remainingLength > 0:
  1206. self.reasonCode.unpack(buffer[curlen:])
  1207. curlen += 1
  1208. if self.fh.remainingLength > 1:
  1209. curlen += self.properties.unpack(buffer[curlen:])[1]
  1210. assert curlen == fhlen + self.fh.remainingLength, \
  1211. "DISCONNECT packet is wrong length %d" % self.fh.remainingLength
  1212. return fhlen + self.fh.remainingLength
  1213. def __str__(self):
  1214. return str(self.fh)+", ReasonCode: "+str(self.reasonCode)+", Properties: "+str(self.properties)
  1215. def __eq__(self, packet):
  1216. return Packets.__eq__(self, packet) and \
  1217. self.reasonCode == packet.reasonCode and \
  1218. self.properties == packet.properties
  1219. class Auths(Packets):
  1220. def __init__(self, buffer=None, DUP=False, QoS=0, RETAIN=False,
  1221. reasonCode="Success"):
  1222. object.__setattr__(self, "names",
  1223. ["fh", "DUP", "QoS", "RETAIN", "reasonCode", "properties"])
  1224. self.fh = FixedHeaders(PacketTypes.AUTH)
  1225. self.fh.DUP = DUP
  1226. self.fh.QoS = QoS
  1227. self.fh.RETAIN = RETAIN
  1228. # variable header
  1229. self.reasonCode = ReasonCodes(PacketTypes.AUTH, reasonCode)
  1230. self.properties = Properties(PacketTypes.AUTH)
  1231. if buffer != None:
  1232. self.unpack(buffer)
  1233. def pack(self):
  1234. buffer = self.reasonCode.pack()
  1235. buffer += self.properties.pack()
  1236. buffer = self.fh.pack(len(buffer)) + buffer
  1237. return buffer
  1238. def unpack(self, buffer, maximumPacketSize):
  1239. assert len(buffer) >= 2
  1240. assert PacketType(buffer) == PacketTypes.AUTH
  1241. fhlen = self.fh.unpack(buffer, maximumPacketSize)
  1242. assert len(buffer) >= fhlen + self.fh.remainingLength
  1243. assert self.fh.DUP == False, "[MQTT-2.1.2-1] AUTH reserved bits must be 0"
  1244. assert self.fh.QoS == 0, "[MQTT-2.1.2-1] AUTH reserved bits must be 0"
  1245. assert self.fh.RETAIN == False, "[MQTT-2.1.2-1] AUTH reserved bits must be 0"
  1246. curlen = fhlen
  1247. curlen += self.reasonCode.unpack(buffer[curlen:])
  1248. curlen += self.properties.unpack(buffer[curlen:])[1]
  1249. assert curlen == fhlen + self.fh.remainingLength, \
  1250. "AUTH packet is wrong length %d %d" % (self.fh.remainingLength, curlen)
  1251. return fhlen + self.fh.remainingLength
  1252. def __str__(self):
  1253. return str(self.fh)+", ReasonCode: "+str(self.reasonCode)+", Properties: "+str(self.properties)
  1254. def __eq__(self, packet):
  1255. return Packets.__eq__(self, packet) and \
  1256. self.reasonCode == packet.reasonCode and \
  1257. self.properties == packet.properties
  1258. classes = [Connects, Connacks, Publishes, Pubacks, Pubrecs,
  1259. Pubrels, Pubcomps, Subscribes, Subacks, Unsubscribes,
  1260. Unsubacks, Pingreqs, Pingresps, Disconnects, Auths]
  1261. def unpackPacket(buffer, maximumPacketSize=MAX_PACKET_SIZE):
  1262. if PacketType(buffer) != None:
  1263. packet = classes[PacketType(buffer)-1]()
  1264. packet.unpack(buffer, maximumPacketSize=maximumPacketSize)
  1265. else:
  1266. packet = None
  1267. return packet