mqttasync_module.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026
  1. #include <Python.h>
  2. #include "MQTTAsync.h"
  3. #include "LinkedList.h"
  4. static PyObject *MqttV3Error;
  5. static PyObject* mqttv3_create(PyObject* self, PyObject *args)
  6. {
  7. MQTTAsync c;
  8. char* serverURI;
  9. char* clientId;
  10. int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
  11. PyObject *pyoptions = NULL, *temp;
  12. MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer;
  13. int rc;
  14. if (!PyArg_ParseTuple(args, "ss|iO", &serverURI, &clientId,
  15. &persistence_option, &pyoptions))
  16. return NULL;
  17. if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
  18. && persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
  19. {
  20. PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
  21. return NULL;
  22. }
  23. if (pyoptions)
  24. {
  25. if (!PyDict_Check(pyoptions))
  26. {
  27. PyErr_SetString(PyExc_TypeError,
  28. "Create options parameter must be a dictionary");
  29. return NULL;
  30. }
  31. if ((temp = PyDict_GetItemString(pyoptions, "sendWhileDisconnected"))
  32. != NULL)
  33. {
  34. if (PyInt_Check(temp))
  35. options.sendWhileDisconnected = (int) PyInt_AsLong(temp);
  36. else
  37. {
  38. PyErr_SetString(PyExc_TypeError, "sendWhileDisconnected value must be int");
  39. return NULL;
  40. }
  41. }
  42. if ((temp = PyDict_GetItemString(pyoptions, "maxBufferedMessages"))
  43. != NULL)
  44. {
  45. if (PyInt_Check(temp))
  46. options.maxBufferedMessages = (int) PyInt_AsLong(temp);
  47. else
  48. {
  49. PyErr_SetString(PyExc_TypeError, "maxBufferedMessages value must be int");
  50. return NULL;
  51. }
  52. }
  53. rc = MQTTAsync_createWithOptions(&c, serverURI, clientId, persistence_option, NULL, &options);
  54. }
  55. else
  56. rc = MQTTAsync_create(&c, serverURI, clientId, persistence_option, NULL);
  57. return Py_BuildValue("ik", rc, c);
  58. }
  59. static List* callbacks = NULL;
  60. static List* connected_callbacks = NULL;
  61. enum msgTypes
  62. {
  63. CONNECT, PUBLISH, SUBSCRIBE, SUBSCRIBE_MANY, UNSUBSCRIBE
  64. };
  65. typedef struct
  66. {
  67. MQTTAsync c;
  68. PyObject *context;
  69. PyObject *cl, *ma, *dc;
  70. } CallbackEntry;
  71. typedef struct
  72. {
  73. MQTTAsync c;
  74. PyObject *context;
  75. PyObject *co;
  76. } ConnectedEntry;
  77. int clientCompare(void* a, void* b)
  78. {
  79. CallbackEntry* e = (CallbackEntry*) a;
  80. return e->c == (MQTTAsync) b;
  81. }
  82. int connectedCompare(void* a, void* b)
  83. {
  84. ConnectedEntry* e = (ConnectedEntry*) a;
  85. return e->c == (MQTTAsync) b;
  86. }
  87. void connected(void* context, char* cause)
  88. {
  89. /* call the right Python function, using the context */
  90. PyObject *arglist;
  91. PyObject *result;
  92. ConnectedEntry* e = context;
  93. PyGILState_STATE gstate;
  94. gstate = PyGILState_Ensure();
  95. arglist = Py_BuildValue("Os", e->context, cause);
  96. result = PyEval_CallObject(e->co, arglist);
  97. Py_DECREF(arglist);
  98. PyGILState_Release(gstate);
  99. }
  100. void connectionLost(void* context, char* cause)
  101. {
  102. /* call the right Python function, using the context */
  103. PyObject *arglist;
  104. PyObject *result;
  105. CallbackEntry* e = context;
  106. PyGILState_STATE gstate;
  107. gstate = PyGILState_Ensure();
  108. arglist = Py_BuildValue("Os", e->context, cause);
  109. result = PyEval_CallObject(e->cl, arglist);
  110. Py_DECREF(arglist);
  111. PyGILState_Release(gstate);
  112. }
  113. int messageArrived(void* context, char* topicName, int topicLen,
  114. MQTTAsync_message* message)
  115. {
  116. PyObject *result = NULL;
  117. CallbackEntry* e = context;
  118. int rc = -99;
  119. PyGILState_STATE gstate;
  120. gstate = PyGILState_Ensure();
  121. if (topicLen == 0)
  122. result = PyObject_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
  123. topicName, "payload", message->payload, message->payloadlen,
  124. "qos", message->qos, "retained", message->retained, "dup",
  125. message->dup, "msgid", message->msgid);
  126. else
  127. result = PyObject_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
  128. topicName, topicLen, "payload", message->payload,
  129. message->payloadlen, "qos", message->qos, "retained",
  130. message->retained, "dup", message->dup, "msgid",
  131. message->msgid);
  132. if (result)
  133. {
  134. if (PyInt_Check(result))
  135. rc = (int) PyInt_AsLong(result);
  136. Py_DECREF(result);
  137. }
  138. PyGILState_Release(gstate);
  139. MQTTAsync_free(topicName);
  140. MQTTAsync_freeMessage(&message);
  141. return rc;
  142. }
  143. void deliveryComplete(void* context, MQTTAsync_token dt)
  144. {
  145. PyObject *arglist;
  146. PyObject *result;
  147. CallbackEntry* e = context;
  148. PyGILState_STATE gstate;
  149. gstate = PyGILState_Ensure();
  150. arglist = Py_BuildValue("Oi", e->context, dt);
  151. result = PyEval_CallObject(e->dc, arglist);
  152. Py_DECREF(arglist);
  153. PyGILState_Release(gstate);
  154. }
  155. static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
  156. {
  157. MQTTAsync c;
  158. CallbackEntry* e = NULL;
  159. int rc;
  160. e = malloc(sizeof(CallbackEntry));
  161. if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
  162. &e->ma, &e->dc))
  163. return NULL;
  164. e->c = c;
  165. if ((e->cl != Py_None && !PyCallable_Check(e->cl))
  166. || (e->ma != Py_None && !PyCallable_Check(e->ma))
  167. || (e->dc != Py_None && !PyCallable_Check(e->dc)))
  168. {
  169. PyErr_SetString(PyExc_TypeError,
  170. "3rd, 4th and 5th parameters must be callable or None");
  171. return NULL;
  172. }
  173. Py_BEGIN_ALLOW_THREADS
  174. rc = MQTTAsync_setCallbacks(c, e, connectionLost, messageArrived, deliveryComplete);
  175. Py_END_ALLOW_THREADS
  176. if (rc == MQTTASYNC_SUCCESS)
  177. {
  178. ListElement* temp = NULL;
  179. if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
  180. {
  181. ListDetach(callbacks, temp->content);
  182. free(temp->content);
  183. }
  184. ListAppend(callbacks, e, sizeof(e));
  185. Py_XINCREF(e->cl);
  186. Py_XINCREF(e->ma);
  187. Py_XINCREF(e->dc);
  188. Py_XINCREF(e->context);
  189. }
  190. return Py_BuildValue("i", rc);
  191. }
  192. static PyObject* mqttv3_setconnected(PyObject* self, PyObject *args)
  193. {
  194. MQTTAsync c;
  195. ConnectedEntry* e = NULL;
  196. int rc;
  197. e = malloc(sizeof(ConnectedEntry));
  198. if (!PyArg_ParseTuple(args, "kOO", &c, (PyObject**) &e->context, &e->co))
  199. return NULL;
  200. e->c = c;
  201. if (e->co != Py_None && !PyCallable_Check(e->co))
  202. {
  203. PyErr_SetString(PyExc_TypeError,
  204. "3rd parameter must be callable or None");
  205. return NULL;
  206. }
  207. Py_BEGIN_ALLOW_THREADS
  208. rc = MQTTAsync_setConnected(c, e, connected);
  209. Py_END_ALLOW_THREADS
  210. if (rc == MQTTASYNC_SUCCESS)
  211. {
  212. ListElement* temp = NULL;
  213. if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
  214. {
  215. ListDetach(connected_callbacks, temp->content);
  216. free(temp->content);
  217. }
  218. ListAppend(connected_callbacks, e, sizeof(e));
  219. Py_XINCREF(e->co);
  220. Py_XINCREF(e->context);
  221. }
  222. return Py_BuildValue("i", rc);
  223. }
  224. typedef struct
  225. {
  226. MQTTAsync c;
  227. PyObject *context;
  228. PyObject *onSuccess, *onFailure;
  229. enum msgTypes msgType;
  230. } ResponseEntry;
  231. void onSuccess(void* context, MQTTAsync_successData* response)
  232. {
  233. PyObject *result = NULL;
  234. ResponseEntry* e = context;
  235. PyGILState_STATE gstate;
  236. gstate = PyGILState_Ensure();
  237. switch (e->msgType)
  238. {
  239. case CONNECT:
  240. result = PyObject_CallFunction(e->onSuccess, "O{sisiss}", (e->context) ? e->context : Py_None,
  241. "MQTTVersion", response->alt.connect.MQTTVersion,
  242. "sessionPresent", response->alt.connect.sessionPresent,
  243. "serverURI", response->alt.connect.serverURI);
  244. break;
  245. case PUBLISH:
  246. result = PyObject_CallFunction(e->onSuccess, "O{si ss s{ss# sisi}}", (e->context) ? e->context : Py_None,
  247. "token", response->token,
  248. "destinationName", response->alt.pub.destinationName,
  249. "message",
  250. "payload", response->alt.pub.message.payload,
  251. response->alt.pub.message.payloadlen,
  252. "qos", response->alt.pub.message.qos,
  253. "retained", response->alt.pub.message.retained);
  254. break;
  255. case SUBSCRIBE:
  256. result = PyObject_CallFunction(e->onSuccess, "O{sisi}", (e->context) ? e->context : Py_None,
  257. "token", response->token,
  258. "qos", response->alt.qos);
  259. break;
  260. case SUBSCRIBE_MANY:
  261. result = PyObject_CallFunction(e->onSuccess, "O{sis[i]}", (e->context) ? e->context : Py_None,
  262. "token", response->token,
  263. "qosList", response->alt.qosList[0]);
  264. break;
  265. case UNSUBSCRIBE:
  266. result = PyObject_CallFunction(e->onSuccess, "O{si}", (e->context) ? e->context : Py_None,
  267. "token", response->token);
  268. break;
  269. }
  270. if (result)
  271. {
  272. Py_DECREF(result);
  273. printf("decrementing reference count for result\n");
  274. }
  275. PyGILState_Release(gstate);
  276. free(e);
  277. }
  278. void onFailure(void* context, MQTTAsync_failureData* response)
  279. {
  280. PyObject *result = NULL;
  281. PyObject *arglist = NULL;
  282. ResponseEntry* e = context;
  283. PyGILState_STATE gstate;
  284. gstate = PyGILState_Ensure();
  285. // TODO: convert response into Python structure
  286. if (e->context)
  287. arglist = Py_BuildValue("OO", e->context, response);
  288. else
  289. arglist = Py_BuildValue("OO", Py_None, response);
  290. result = PyEval_CallObject(e->onFailure, arglist);
  291. Py_DECREF(arglist);
  292. PyGILState_Release(gstate);
  293. free(e);
  294. }
  295. /* return true if ok, false otherwise */
  296. int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions* responseOptions,
  297. enum msgTypes msgType)
  298. {
  299. PyObject *temp = NULL;
  300. if (!pyoptions)
  301. return 1;
  302. if (!PyDict_Check(pyoptions))
  303. {
  304. PyErr_SetString(PyExc_TypeError, "Response options must be a dictionary");
  305. return 0;
  306. }
  307. if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
  308. {
  309. if (PyCallable_Check(temp)) /* temp points to Python function */
  310. responseOptions->onSuccess = (MQTTAsync_onSuccess*)temp;
  311. else
  312. {
  313. PyErr_SetString(PyExc_TypeError,
  314. "onSuccess value must be callable");
  315. return 0;
  316. }
  317. }
  318. if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
  319. {
  320. if (PyCallable_Check(temp))
  321. responseOptions->onFailure = (MQTTAsync_onFailure*)temp;
  322. else
  323. {
  324. PyErr_SetString(PyExc_TypeError,
  325. "onFailure value must be callable");
  326. return 0;
  327. }
  328. }
  329. responseOptions->context = PyDict_GetItemString(pyoptions, "context");
  330. if (responseOptions->onFailure || responseOptions->onSuccess)
  331. {
  332. ResponseEntry* r = malloc(sizeof(ResponseEntry));
  333. r->c = c;
  334. r->context = responseOptions->context;
  335. responseOptions->context = r;
  336. r->onSuccess = (PyObject*)responseOptions->onSuccess;
  337. responseOptions->onSuccess = onSuccess;
  338. r->onFailure = (PyObject*)responseOptions->onFailure;
  339. responseOptions->onFailure = onFailure;
  340. r->msgType = msgType;
  341. }
  342. return 1; /* not an error, if we get here */
  343. }
  344. static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
  345. {
  346. MQTTAsync c;
  347. PyObject *pyoptions = NULL, *temp;
  348. MQTTAsync_connectOptions connectOptions = MQTTAsync_connectOptions_initializer;
  349. MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer;
  350. int rc;
  351. if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
  352. return NULL;
  353. if (!pyoptions)
  354. goto skip;
  355. if (!PyDict_Check(pyoptions))
  356. {
  357. PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
  358. return NULL;
  359. }
  360. if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
  361. {
  362. if (PyCallable_Check(temp)) /* temp points to Python function */
  363. connectOptions.onSuccess = (MQTTAsync_onSuccess*)temp;
  364. else
  365. {
  366. PyErr_SetString(PyExc_TypeError,
  367. "onSuccess value must be callable");
  368. return NULL;
  369. }
  370. }
  371. if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
  372. {
  373. if (PyCallable_Check(temp))
  374. connectOptions.onFailure = (MQTTAsync_onFailure*)temp;
  375. else
  376. {
  377. PyErr_SetString(PyExc_TypeError,
  378. "onFailure value must be callable");
  379. return NULL;
  380. }
  381. }
  382. connectOptions.context = PyDict_GetItemString(pyoptions, "context");
  383. if (connectOptions.onFailure || connectOptions.onSuccess)
  384. {
  385. ResponseEntry* r = malloc(sizeof(ResponseEntry));
  386. r->c = c;
  387. r->context = connectOptions.context;
  388. connectOptions.context = r;
  389. r->onSuccess = (PyObject*)connectOptions.onSuccess;
  390. connectOptions.onSuccess = onSuccess;
  391. r->onFailure = (PyObject*)connectOptions.onFailure;
  392. connectOptions.onFailure = onFailure;
  393. r->msgType = CONNECT;
  394. }
  395. if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
  396. {
  397. if (PyInt_Check(temp))
  398. connectOptions.keepAliveInterval = (int) PyInt_AsLong(temp);
  399. else
  400. {
  401. PyErr_SetString(PyExc_TypeError,
  402. "keepAliveLiveInterval value must be int");
  403. return NULL;
  404. }
  405. }
  406. if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
  407. {
  408. if (PyInt_Check(temp))
  409. connectOptions.cleansession = (int) PyInt_AsLong(temp);
  410. else
  411. {
  412. PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
  413. return NULL;
  414. }
  415. }
  416. if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
  417. {
  418. if (PyDict_Check(temp))
  419. {
  420. PyObject *wtemp = NULL;
  421. if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
  422. {
  423. PyErr_SetString(PyExc_TypeError,
  424. "will topicName value must be set");
  425. return NULL;
  426. }
  427. else
  428. {
  429. if (PyString_Check(wtemp))
  430. willOptions.topicName = PyString_AsString(wtemp);
  431. else
  432. {
  433. PyErr_SetString(PyExc_TypeError,
  434. "will topicName value must be string");
  435. return NULL;
  436. }
  437. }
  438. if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
  439. {
  440. PyErr_SetString(PyExc_TypeError,
  441. "will message value must be set");
  442. return NULL;
  443. }
  444. else
  445. {
  446. if (PyString_Check(wtemp))
  447. willOptions.message = PyString_AsString(wtemp);
  448. else
  449. {
  450. PyErr_SetString(PyExc_TypeError,
  451. "will message value must be string");
  452. return NULL;
  453. }
  454. }
  455. if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
  456. {
  457. if (PyInt_Check(wtemp))
  458. willOptions.retained = (int) PyInt_AsLong(wtemp);
  459. else
  460. {
  461. PyErr_SetString(PyExc_TypeError,
  462. "will retained value must be int");
  463. return NULL;
  464. }
  465. }
  466. if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
  467. {
  468. if (PyInt_Check(wtemp))
  469. willOptions.qos = (int) PyInt_AsLong(wtemp);
  470. else
  471. {
  472. PyErr_SetString(PyExc_TypeError,
  473. "will qos value must be int");
  474. return NULL;
  475. }
  476. }
  477. connectOptions.will = &willOptions;
  478. }
  479. else
  480. {
  481. PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
  482. return NULL;
  483. }
  484. }
  485. if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
  486. {
  487. if (PyString_Check(temp))
  488. connectOptions.username = PyString_AsString(temp);
  489. else
  490. {
  491. PyErr_SetString(PyExc_TypeError, "username value must be string");
  492. return NULL;
  493. }
  494. }
  495. if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
  496. {
  497. if (PyString_Check(temp))
  498. connectOptions.username = PyString_AsString(temp);
  499. else
  500. {
  501. PyErr_SetString(PyExc_TypeError, "password value must be string");
  502. return NULL;
  503. }
  504. }
  505. if ((temp = PyDict_GetItemString(pyoptions, "automaticReconnect")) != NULL)
  506. {
  507. if (PyInt_Check(temp))
  508. connectOptions.automaticReconnect = (int) PyInt_AsLong(temp);
  509. else
  510. {
  511. PyErr_SetString(PyExc_TypeError, "automatic reconnect value must be int");
  512. return NULL;
  513. }
  514. }
  515. if ((temp = PyDict_GetItemString(pyoptions, "minRetryInterval")) != NULL)
  516. {
  517. if (PyInt_Check(temp))
  518. connectOptions.minRetryInterval = (int) PyInt_AsLong(temp);
  519. else
  520. {
  521. PyErr_SetString(PyExc_TypeError, "minRetryInterval value must be int");
  522. return NULL;
  523. }
  524. }
  525. if ((temp = PyDict_GetItemString(pyoptions, "maxRetryInterval")) != NULL)
  526. {
  527. if (PyInt_Check(temp))
  528. connectOptions.maxRetryInterval = (int) PyInt_AsLong(temp);
  529. else
  530. {
  531. PyErr_SetString(PyExc_TypeError, "maxRetryInterval value must be int");
  532. return NULL;
  533. }
  534. }
  535. skip:
  536. Py_BEGIN_ALLOW_THREADS
  537. rc = MQTTAsync_connect(c, &connectOptions);
  538. Py_END_ALLOW_THREADS
  539. return Py_BuildValue("i", rc);
  540. }
  541. static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
  542. {
  543. MQTTAsync c;
  544. MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
  545. int rc;
  546. if (!PyArg_ParseTuple(args, "k|i", &c, &options.timeout))
  547. return NULL;
  548. Py_BEGIN_ALLOW_THREADS
  549. rc = MQTTAsync_disconnect(c, &options);
  550. Py_END_ALLOW_THREADS
  551. return Py_BuildValue("i", rc);
  552. }
  553. static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
  554. {
  555. MQTTAsync c;
  556. int rc;
  557. if (!PyArg_ParseTuple(args, "k", &c))
  558. return NULL;
  559. Py_BEGIN_ALLOW_THREADS
  560. rc = MQTTAsync_isConnected(c);
  561. Py_END_ALLOW_THREADS
  562. return Py_BuildValue("i", rc);
  563. }
  564. static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
  565. {
  566. MQTTAsync c;
  567. MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
  568. PyObject *pyoptions = NULL;
  569. char* topic;
  570. int qos = 2;
  571. int rc;
  572. if (!PyArg_ParseTuple(args, "ks|iO", &c, &topic, &qos, &pyoptions))
  573. return NULL;
  574. if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
  575. return NULL;
  576. Py_BEGIN_ALLOW_THREADS;
  577. rc = MQTTAsync_subscribe(c, topic, qos, &response);
  578. Py_END_ALLOW_THREADS;
  579. return Py_BuildValue("i", rc);
  580. }
  581. static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
  582. {
  583. MQTTAsync c;
  584. MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
  585. PyObject* topicList;
  586. PyObject* qosList;
  587. PyObject *pyoptions = NULL;
  588. int count;
  589. char** topics;
  590. int* qoss;
  591. int i, rc = 0;
  592. if (!PyArg_ParseTuple(args, "kOO|O", &c, &topicList, &qosList, &pyoptions))
  593. return NULL;
  594. if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
  595. return NULL;
  596. if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
  597. {
  598. PyErr_SetString(PyExc_TypeError,
  599. "3rd and 4th parameters must be sequences");
  600. return NULL;
  601. }
  602. if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
  603. {
  604. PyErr_SetString(PyExc_TypeError,
  605. "3rd and 4th parameters must be sequences of the same length");
  606. return NULL;
  607. }
  608. topics = malloc(count * sizeof(char*));
  609. for (i = 0; i < count; ++i)
  610. topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
  611. qoss = malloc(count * sizeof(int));
  612. for (i = 0; i < count; ++i)
  613. qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
  614. Py_BEGIN_ALLOW_THREADS
  615. rc = MQTTAsync_subscribeMany(c, count, topics, qoss, &response);
  616. Py_END_ALLOW_THREADS
  617. for (i = 0; i < count; ++i)
  618. PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
  619. free(topics);
  620. free(qoss);
  621. if (rc == MQTTASYNC_SUCCESS)
  622. return Py_BuildValue("iO", rc, qosList);
  623. else
  624. return Py_BuildValue("i", rc);
  625. }
  626. static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
  627. {
  628. MQTTAsync c;
  629. MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
  630. PyObject *pyoptions = NULL;
  631. char* topic;
  632. int rc;
  633. if (!PyArg_ParseTuple(args, "ks|O", &c, &topic, &pyoptions))
  634. return NULL;
  635. if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
  636. return NULL;
  637. Py_BEGIN_ALLOW_THREADS
  638. rc = MQTTAsync_unsubscribe(c, topic, &response);
  639. Py_END_ALLOW_THREADS
  640. return Py_BuildValue("i", rc);
  641. }
  642. static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
  643. {
  644. MQTTAsync c;
  645. MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
  646. PyObject* topicList;
  647. PyObject *pyoptions = NULL;
  648. int count;
  649. char** topics;
  650. int i, rc = 0;
  651. if (!PyArg_ParseTuple(args, "kO|O", &c, &topicList, &pyoptions))
  652. return NULL;
  653. if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
  654. return NULL;
  655. if (!PySequence_Check(topicList))
  656. {
  657. PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
  658. return NULL;
  659. }
  660. count = PySequence_Length(topicList);
  661. topics = malloc(count * sizeof(char*));
  662. for (i = 0; i < count; ++i)
  663. topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
  664. Py_BEGIN_ALLOW_THREADS
  665. rc = MQTTAsync_unsubscribeMany(c, count, topics, &response);
  666. Py_END_ALLOW_THREADS
  667. free(topics);
  668. return Py_BuildValue("i", rc);
  669. }
  670. static PyObject* mqttv3_send(PyObject* self, PyObject *args)
  671. {
  672. MQTTAsync c;
  673. char* destinationName;
  674. int payloadlen;
  675. void* payload;
  676. int qos = 0;
  677. int retained = 0;
  678. MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
  679. PyObject *pyoptions = NULL;
  680. int rc;
  681. if (!PyArg_ParseTuple(args, "kss#|iiO", &c, &destinationName, &payload,
  682. &payloadlen, &qos, &retained, &pyoptions))
  683. return NULL;
  684. if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
  685. return NULL;
  686. Py_BEGIN_ALLOW_THREADS
  687. rc = MQTTAsync_send(c, destinationName, payloadlen, payload, qos, retained, &response);
  688. Py_END_ALLOW_THREADS
  689. if (rc == MQTTASYNC_SUCCESS && qos > 0)
  690. return Py_BuildValue("ii", rc, response);
  691. else
  692. return Py_BuildValue("i", rc);
  693. }
  694. static PyObject* mqttv3_sendMessage(PyObject* self, PyObject *args)
  695. {
  696. MQTTAsync c;
  697. char* destinationName;
  698. PyObject *message, *temp;
  699. MQTTAsync_message msg = MQTTAsync_message_initializer;
  700. MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
  701. PyObject *pyoptions = NULL;
  702. int rc;
  703. if (!PyArg_ParseTuple(args, "ksO|O", &c, &destinationName, &message, &pyoptions))
  704. return NULL;
  705. if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
  706. return NULL;
  707. if (!PyDict_Check(message))
  708. {
  709. PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
  710. return NULL;
  711. }
  712. if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
  713. {
  714. PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
  715. return NULL;
  716. }
  717. if (PyString_Check(temp))
  718. PyString_AsStringAndSize(temp, (char**) &msg.payload,
  719. (Py_ssize_t*) &msg.payloadlen);
  720. else
  721. {
  722. PyErr_SetString(PyExc_TypeError, "payload value must be string");
  723. return NULL;
  724. }
  725. if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
  726. msg.qos = (int) PyInt_AsLong(temp);
  727. if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
  728. msg.retained = (int) PyInt_AsLong(temp);
  729. Py_BEGIN_ALLOW_THREADS
  730. rc = MQTTAsync_sendMessage(c, destinationName, &msg, &response);
  731. Py_END_ALLOW_THREADS
  732. if (rc == MQTTASYNC_SUCCESS && msg.qos > 0)
  733. return Py_BuildValue("ii", rc, response);
  734. else
  735. return Py_BuildValue("i", rc);
  736. }
  737. static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
  738. {
  739. MQTTAsync c;
  740. unsigned long timeout = 1000L;
  741. MQTTAsync_token dt;
  742. int rc;
  743. if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
  744. return NULL;
  745. Py_BEGIN_ALLOW_THREADS
  746. rc = MQTTAsync_waitForCompletion(c, dt, timeout);
  747. Py_END_ALLOW_THREADS
  748. return Py_BuildValue("i", rc);
  749. }
  750. static PyObject* mqttv3_getPendingTokens(PyObject* self, PyObject *args)
  751. {
  752. MQTTAsync c;
  753. MQTTAsync_token* tokens;
  754. int rc;
  755. if (!PyArg_ParseTuple(args, "k", &c))
  756. return NULL;
  757. Py_BEGIN_ALLOW_THREADS
  758. rc = MQTTAsync_getPendingTokens(c, &tokens);
  759. Py_END_ALLOW_THREADS
  760. if (rc == MQTTASYNC_SUCCESS)
  761. {
  762. int i = 0;
  763. PyObject* dts = PyList_New(0);
  764. while (tokens[i] != -1)
  765. PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
  766. return Py_BuildValue("iO", rc, dts);
  767. }
  768. else
  769. return Py_BuildValue("i", rc);
  770. }
  771. static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
  772. {
  773. MQTTAsync c;
  774. ListElement* temp = NULL;
  775. if (!PyArg_ParseTuple(args, "k", &c))
  776. return NULL;
  777. if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
  778. {
  779. ListDetach(callbacks, temp->content);
  780. free(temp->content);
  781. }
  782. if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
  783. {
  784. ListDetach(connected_callbacks, temp->content);
  785. free(temp->content);
  786. }
  787. MQTTAsync_destroy(&c);
  788. Py_INCREF(Py_None);
  789. return Py_None;
  790. }
  791. static PyMethodDef MqttV3Methods[] =
  792. {
  793. { "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
  794. { "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
  795. "Sets the callback functions for a particular client." },
  796. { "setconnected", mqttv3_setconnected, METH_VARARGS,
  797. "Sets the connected callback function for a particular client." },
  798. { "connect", mqttv3_connect, METH_VARARGS,
  799. "Connects to a server using the specified options." },
  800. { "disconnect", mqttv3_disconnect, METH_VARARGS,
  801. "Disconnects from a server." },
  802. { "isConnected", mqttv3_isConnected, METH_VARARGS,
  803. "Determines if this client is currently connected to the server." },
  804. { "subscribe", mqttv3_subscribe, METH_VARARGS,
  805. "Subscribe to the given topic." },
  806. { "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
  807. "Subscribe to the given topics." },
  808. { "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
  809. "Unsubscribe from the given topic." },
  810. { "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
  811. "Unsubscribe from the given topics." },
  812. { "send", mqttv3_send, METH_VARARGS,
  813. "Publish a message to the given topic." },
  814. { "sendMessage", mqttv3_sendMessage, METH_VARARGS,
  815. "Publish a message to the given topic." },
  816. { "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
  817. "Waits for the completion of the delivery of the message represented by a delivery token." },
  818. { "getPendingTokens", mqttv3_getPendingTokens, METH_VARARGS,
  819. "Returns the tokens pending of completion." },
  820. { "destroy", mqttv3_destroy, METH_VARARGS,
  821. "Free memory allocated to a MQTT client. It is the opposite to create." },
  822. { NULL, NULL, 0, NULL } /* Sentinel */
  823. };
  824. PyMODINIT_FUNC initpaho_mqtt3a(void)
  825. {
  826. PyObject *m;
  827. PyEval_InitThreads();
  828. callbacks = ListInitialize();
  829. connected_callbacks = ListInitialize();
  830. m = Py_InitModule("paho_mqtt3a", MqttV3Methods);
  831. if (m == NULL)
  832. return;
  833. MqttV3Error = PyErr_NewException("paho_mqtt3a.error", NULL, NULL);
  834. Py_INCREF(MqttV3Error);
  835. PyModule_AddObject(m, "error", MqttV3Error);
  836. PyModule_AddIntConstant(m, "SUCCESS", MQTTASYNC_SUCCESS);
  837. PyModule_AddIntConstant(m, "FAILURE", MQTTASYNC_FAILURE);
  838. PyModule_AddIntConstant(m, "DISCONNECTED", MQTTASYNC_DISCONNECTED);
  839. PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTASYNC_MAX_MESSAGES_INFLIGHT);
  840. PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTASYNC_BAD_UTF8_STRING);
  841. PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTASYNC_NULL_PARAMETER);
  842. PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTASYNC_TOPICNAME_TRUNCATED);
  843. PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
  844. PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
  845. PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
  846. PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
  847. MQTTCLIENT_PERSISTENCE_ERROR);
  848. }