123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026 |
- #include <Python.h>
- #include "MQTTAsync.h"
- #include "LinkedList.h"
- static PyObject *MqttV3Error;
- static PyObject* mqttv3_create(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- char* serverURI;
- char* clientId;
- int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT;
- PyObject *pyoptions = NULL, *temp;
- MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer;
- int rc;
- if (!PyArg_ParseTuple(args, "ss|iO", &serverURI, &clientId,
- &persistence_option, &pyoptions))
- return NULL;
- if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT
- && persistence_option != MQTTCLIENT_PERSISTENCE_NONE)
- {
- PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE");
- return NULL;
- }
- if (pyoptions)
- {
- if (!PyDict_Check(pyoptions))
- {
- PyErr_SetString(PyExc_TypeError,
- "Create options parameter must be a dictionary");
- return NULL;
- }
- if ((temp = PyDict_GetItemString(pyoptions, "sendWhileDisconnected"))
- != NULL)
- {
- if (PyInt_Check(temp))
- options.sendWhileDisconnected = (int) PyInt_AsLong(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "sendWhileDisconnected value must be int");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "maxBufferedMessages"))
- != NULL)
- {
- if (PyInt_Check(temp))
- options.maxBufferedMessages = (int) PyInt_AsLong(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "maxBufferedMessages value must be int");
- return NULL;
- }
- }
- rc = MQTTAsync_createWithOptions(&c, serverURI, clientId, persistence_option, NULL, &options);
- }
- else
- rc = MQTTAsync_create(&c, serverURI, clientId, persistence_option, NULL);
- return Py_BuildValue("ik", rc, c);
- }
- static List* callbacks = NULL;
- static List* connected_callbacks = NULL;
- enum msgTypes
- {
- CONNECT, PUBLISH, SUBSCRIBE, SUBSCRIBE_MANY, UNSUBSCRIBE
- };
- typedef struct
- {
- MQTTAsync c;
- PyObject *context;
- PyObject *cl, *ma, *dc;
- } CallbackEntry;
- typedef struct
- {
- MQTTAsync c;
- PyObject *context;
- PyObject *co;
- } ConnectedEntry;
- int clientCompare(void* a, void* b)
- {
- CallbackEntry* e = (CallbackEntry*) a;
- return e->c == (MQTTAsync) b;
- }
- int connectedCompare(void* a, void* b)
- {
- ConnectedEntry* e = (ConnectedEntry*) a;
- return e->c == (MQTTAsync) b;
- }
- void connected(void* context, char* cause)
- {
- /* call the right Python function, using the context */
- PyObject *arglist;
- PyObject *result;
- ConnectedEntry* e = context;
- PyGILState_STATE gstate;
- gstate = PyGILState_Ensure();
- arglist = Py_BuildValue("Os", e->context, cause);
- result = PyEval_CallObject(e->co, arglist);
- Py_DECREF(arglist);
- PyGILState_Release(gstate);
- }
- void connectionLost(void* context, char* cause)
- {
- /* call the right Python function, using the context */
- PyObject *arglist;
- PyObject *result;
- CallbackEntry* e = context;
- PyGILState_STATE gstate;
- gstate = PyGILState_Ensure();
- arglist = Py_BuildValue("Os", e->context, cause);
- result = PyEval_CallObject(e->cl, arglist);
- Py_DECREF(arglist);
- PyGILState_Release(gstate);
- }
- int messageArrived(void* context, char* topicName, int topicLen,
- MQTTAsync_message* message)
- {
- PyObject *result = NULL;
- CallbackEntry* e = context;
- int rc = -99;
- PyGILState_STATE gstate;
- gstate = PyGILState_Ensure();
- if (topicLen == 0)
- result = PyObject_CallFunction(e->ma, "Os{ss#sisisisi}", e->context,
- topicName, "payload", message->payload, message->payloadlen,
- "qos", message->qos, "retained", message->retained, "dup",
- message->dup, "msgid", message->msgid);
- else
- result = PyObject_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context,
- topicName, topicLen, "payload", message->payload,
- message->payloadlen, "qos", message->qos, "retained",
- message->retained, "dup", message->dup, "msgid",
- message->msgid);
- if (result)
- {
- if (PyInt_Check(result))
- rc = (int) PyInt_AsLong(result);
- Py_DECREF(result);
- }
- PyGILState_Release(gstate);
- MQTTAsync_free(topicName);
- MQTTAsync_freeMessage(&message);
- return rc;
- }
- void deliveryComplete(void* context, MQTTAsync_token dt)
- {
- PyObject *arglist;
- PyObject *result;
- CallbackEntry* e = context;
- PyGILState_STATE gstate;
- gstate = PyGILState_Ensure();
- arglist = Py_BuildValue("Oi", e->context, dt);
- result = PyEval_CallObject(e->dc, arglist);
- Py_DECREF(arglist);
- PyGILState_Release(gstate);
- }
- static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- CallbackEntry* e = NULL;
- int rc;
- e = malloc(sizeof(CallbackEntry));
- if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl,
- &e->ma, &e->dc))
- return NULL;
- e->c = c;
- if ((e->cl != Py_None && !PyCallable_Check(e->cl))
- || (e->ma != Py_None && !PyCallable_Check(e->ma))
- || (e->dc != Py_None && !PyCallable_Check(e->dc)))
- {
- PyErr_SetString(PyExc_TypeError,
- "3rd, 4th and 5th parameters must be callable or None");
- return NULL;
- }
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_setCallbacks(c, e, connectionLost, messageArrived, deliveryComplete);
- Py_END_ALLOW_THREADS
- if (rc == MQTTASYNC_SUCCESS)
- {
- ListElement* temp = NULL;
- if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
- {
- ListDetach(callbacks, temp->content);
- free(temp->content);
- }
- ListAppend(callbacks, e, sizeof(e));
- Py_XINCREF(e->cl);
- Py_XINCREF(e->ma);
- Py_XINCREF(e->dc);
- Py_XINCREF(e->context);
- }
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_setconnected(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- ConnectedEntry* e = NULL;
- int rc;
- e = malloc(sizeof(ConnectedEntry));
- if (!PyArg_ParseTuple(args, "kOO", &c, (PyObject**) &e->context, &e->co))
- return NULL;
- e->c = c;
- if (e->co != Py_None && !PyCallable_Check(e->co))
- {
- PyErr_SetString(PyExc_TypeError,
- "3rd parameter must be callable or None");
- return NULL;
- }
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_setConnected(c, e, connected);
- Py_END_ALLOW_THREADS
- if (rc == MQTTASYNC_SUCCESS)
- {
- ListElement* temp = NULL;
- if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
- {
- ListDetach(connected_callbacks, temp->content);
- free(temp->content);
- }
- ListAppend(connected_callbacks, e, sizeof(e));
- Py_XINCREF(e->co);
- Py_XINCREF(e->context);
- }
- return Py_BuildValue("i", rc);
- }
- typedef struct
- {
- MQTTAsync c;
- PyObject *context;
- PyObject *onSuccess, *onFailure;
- enum msgTypes msgType;
- } ResponseEntry;
- void onSuccess(void* context, MQTTAsync_successData* response)
- {
- PyObject *result = NULL;
- ResponseEntry* e = context;
- PyGILState_STATE gstate;
- gstate = PyGILState_Ensure();
- switch (e->msgType)
- {
- case CONNECT:
- result = PyObject_CallFunction(e->onSuccess, "O{sisiss}", (e->context) ? e->context : Py_None,
- "MQTTVersion", response->alt.connect.MQTTVersion,
- "sessionPresent", response->alt.connect.sessionPresent,
- "serverURI", response->alt.connect.serverURI);
- break;
- case PUBLISH:
- result = PyObject_CallFunction(e->onSuccess, "O{si ss s{ss# sisi}}", (e->context) ? e->context : Py_None,
- "token", response->token,
- "destinationName", response->alt.pub.destinationName,
- "message",
- "payload", response->alt.pub.message.payload,
- response->alt.pub.message.payloadlen,
- "qos", response->alt.pub.message.qos,
- "retained", response->alt.pub.message.retained);
- break;
- case SUBSCRIBE:
- result = PyObject_CallFunction(e->onSuccess, "O{sisi}", (e->context) ? e->context : Py_None,
- "token", response->token,
- "qos", response->alt.qos);
- break;
- case SUBSCRIBE_MANY:
- result = PyObject_CallFunction(e->onSuccess, "O{sis[i]}", (e->context) ? e->context : Py_None,
- "token", response->token,
- "qosList", response->alt.qosList[0]);
- break;
- case UNSUBSCRIBE:
- result = PyObject_CallFunction(e->onSuccess, "O{si}", (e->context) ? e->context : Py_None,
- "token", response->token);
- break;
- }
- if (result)
- {
- Py_DECREF(result);
- printf("decrementing reference count for result\n");
- }
- PyGILState_Release(gstate);
- free(e);
- }
- void onFailure(void* context, MQTTAsync_failureData* response)
- {
- PyObject *result = NULL;
- PyObject *arglist = NULL;
- ResponseEntry* e = context;
- PyGILState_STATE gstate;
- gstate = PyGILState_Ensure();
- // TODO: convert response into Python structure
- if (e->context)
- arglist = Py_BuildValue("OO", e->context, response);
- else
- arglist = Py_BuildValue("OO", Py_None, response);
- result = PyEval_CallObject(e->onFailure, arglist);
- Py_DECREF(arglist);
- PyGILState_Release(gstate);
- free(e);
- }
- /* return true if ok, false otherwise */
- int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions* responseOptions,
- enum msgTypes msgType)
- {
- PyObject *temp = NULL;
- if (!pyoptions)
- return 1;
- if (!PyDict_Check(pyoptions))
- {
- PyErr_SetString(PyExc_TypeError, "Response options must be a dictionary");
- return 0;
- }
- if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
- {
- if (PyCallable_Check(temp)) /* temp points to Python function */
- responseOptions->onSuccess = (MQTTAsync_onSuccess*)temp;
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "onSuccess value must be callable");
- return 0;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
- {
- if (PyCallable_Check(temp))
- responseOptions->onFailure = (MQTTAsync_onFailure*)temp;
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "onFailure value must be callable");
- return 0;
- }
- }
- responseOptions->context = PyDict_GetItemString(pyoptions, "context");
- if (responseOptions->onFailure || responseOptions->onSuccess)
- {
- ResponseEntry* r = malloc(sizeof(ResponseEntry));
- r->c = c;
- r->context = responseOptions->context;
- responseOptions->context = r;
- r->onSuccess = (PyObject*)responseOptions->onSuccess;
- responseOptions->onSuccess = onSuccess;
- r->onFailure = (PyObject*)responseOptions->onFailure;
- responseOptions->onFailure = onFailure;
- r->msgType = msgType;
- }
- return 1; /* not an error, if we get here */
- }
- static PyObject* mqttv3_connect(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- PyObject *pyoptions = NULL, *temp;
- MQTTAsync_connectOptions connectOptions = MQTTAsync_connectOptions_initializer;
- MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer;
- int rc;
- if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions))
- return NULL;
- if (!pyoptions)
- goto skip;
- if (!PyDict_Check(pyoptions))
- {
- PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary");
- return NULL;
- }
- if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL)
- {
- if (PyCallable_Check(temp)) /* temp points to Python function */
- connectOptions.onSuccess = (MQTTAsync_onSuccess*)temp;
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "onSuccess value must be callable");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL)
- {
- if (PyCallable_Check(temp))
- connectOptions.onFailure = (MQTTAsync_onFailure*)temp;
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "onFailure value must be callable");
- return NULL;
- }
- }
- connectOptions.context = PyDict_GetItemString(pyoptions, "context");
- if (connectOptions.onFailure || connectOptions.onSuccess)
- {
- ResponseEntry* r = malloc(sizeof(ResponseEntry));
- r->c = c;
- r->context = connectOptions.context;
- connectOptions.context = r;
- r->onSuccess = (PyObject*)connectOptions.onSuccess;
- connectOptions.onSuccess = onSuccess;
- r->onFailure = (PyObject*)connectOptions.onFailure;
- connectOptions.onFailure = onFailure;
- r->msgType = CONNECT;
- }
- if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL)
- {
- if (PyInt_Check(temp))
- connectOptions.keepAliveInterval = (int) PyInt_AsLong(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "keepAliveLiveInterval value must be int");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL)
- {
- if (PyInt_Check(temp))
- connectOptions.cleansession = (int) PyInt_AsLong(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "cleansession value must be int");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL)
- {
- if (PyDict_Check(temp))
- {
- PyObject *wtemp = NULL;
- if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL)
- {
- PyErr_SetString(PyExc_TypeError,
- "will topicName value must be set");
- return NULL;
- }
- else
- {
- if (PyString_Check(wtemp))
- willOptions.topicName = PyString_AsString(wtemp);
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "will topicName value must be string");
- return NULL;
- }
- }
- if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL)
- {
- PyErr_SetString(PyExc_TypeError,
- "will message value must be set");
- return NULL;
- }
- else
- {
- if (PyString_Check(wtemp))
- willOptions.message = PyString_AsString(wtemp);
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "will message value must be string");
- return NULL;
- }
- }
- if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL)
- {
- if (PyInt_Check(wtemp))
- willOptions.retained = (int) PyInt_AsLong(wtemp);
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "will retained value must be int");
- return NULL;
- }
- }
- if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL)
- {
- if (PyInt_Check(wtemp))
- willOptions.qos = (int) PyInt_AsLong(wtemp);
- else
- {
- PyErr_SetString(PyExc_TypeError,
- "will qos value must be int");
- return NULL;
- }
- }
- connectOptions.will = &willOptions;
- }
- else
- {
- PyErr_SetString(PyExc_TypeError, "will value must be dictionary");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL)
- {
- if (PyString_Check(temp))
- connectOptions.username = PyString_AsString(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "username value must be string");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL)
- {
- if (PyString_Check(temp))
- connectOptions.username = PyString_AsString(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "password value must be string");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "automaticReconnect")) != NULL)
- {
- if (PyInt_Check(temp))
- connectOptions.automaticReconnect = (int) PyInt_AsLong(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "automatic reconnect value must be int");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "minRetryInterval")) != NULL)
- {
- if (PyInt_Check(temp))
- connectOptions.minRetryInterval = (int) PyInt_AsLong(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "minRetryInterval value must be int");
- return NULL;
- }
- }
- if ((temp = PyDict_GetItemString(pyoptions, "maxRetryInterval")) != NULL)
- {
- if (PyInt_Check(temp))
- connectOptions.maxRetryInterval = (int) PyInt_AsLong(temp);
- else
- {
- PyErr_SetString(PyExc_TypeError, "maxRetryInterval value must be int");
- return NULL;
- }
- }
- skip:
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_connect(c, &connectOptions);
- Py_END_ALLOW_THREADS
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
- int rc;
- if (!PyArg_ParseTuple(args, "k|i", &c, &options.timeout))
- return NULL;
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_disconnect(c, &options);
- Py_END_ALLOW_THREADS
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- int rc;
- if (!PyArg_ParseTuple(args, "k", &c))
- return NULL;
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_isConnected(c);
- Py_END_ALLOW_THREADS
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
- PyObject *pyoptions = NULL;
- char* topic;
- int qos = 2;
- int rc;
- if (!PyArg_ParseTuple(args, "ks|iO", &c, &topic, &qos, &pyoptions))
- return NULL;
- if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
- return NULL;
- Py_BEGIN_ALLOW_THREADS;
- rc = MQTTAsync_subscribe(c, topic, qos, &response);
- Py_END_ALLOW_THREADS;
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
- PyObject* topicList;
- PyObject* qosList;
- PyObject *pyoptions = NULL;
- int count;
- char** topics;
- int* qoss;
- int i, rc = 0;
- if (!PyArg_ParseTuple(args, "kOO|O", &c, &topicList, &qosList, &pyoptions))
- return NULL;
- if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE))
- return NULL;
- if (!PySequence_Check(topicList) || !PySequence_Check(qosList))
- {
- PyErr_SetString(PyExc_TypeError,
- "3rd and 4th parameters must be sequences");
- return NULL;
- }
- if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList))
- {
- PyErr_SetString(PyExc_TypeError,
- "3rd and 4th parameters must be sequences of the same length");
- return NULL;
- }
- topics = malloc(count * sizeof(char*));
- for (i = 0; i < count; ++i)
- topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
- qoss = malloc(count * sizeof(int));
- for (i = 0; i < count; ++i)
- qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i));
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_subscribeMany(c, count, topics, qoss, &response);
- Py_END_ALLOW_THREADS
- for (i = 0; i < count; ++i)
- PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i]));
- free(topics);
- free(qoss);
- if (rc == MQTTASYNC_SUCCESS)
- return Py_BuildValue("iO", rc, qosList);
- else
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
- PyObject *pyoptions = NULL;
- char* topic;
- int rc;
- if (!PyArg_ParseTuple(args, "ks|O", &c, &topic, &pyoptions))
- return NULL;
- if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
- return NULL;
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_unsubscribe(c, topic, &response);
- Py_END_ALLOW_THREADS
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
- PyObject* topicList;
- PyObject *pyoptions = NULL;
- int count;
- char** topics;
- int i, rc = 0;
- if (!PyArg_ParseTuple(args, "kO|O", &c, &topicList, &pyoptions))
- return NULL;
- if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE))
- return NULL;
- if (!PySequence_Check(topicList))
- {
- PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences");
- return NULL;
- }
- count = PySequence_Length(topicList);
- topics = malloc(count * sizeof(char*));
- for (i = 0; i < count; ++i)
- topics[i] = PyString_AsString(PySequence_GetItem(topicList, i));
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_unsubscribeMany(c, count, topics, &response);
- Py_END_ALLOW_THREADS
- free(topics);
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_send(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- char* destinationName;
- int payloadlen;
- void* payload;
- int qos = 0;
- int retained = 0;
- MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
- PyObject *pyoptions = NULL;
- int rc;
- if (!PyArg_ParseTuple(args, "kss#|iiO", &c, &destinationName, &payload,
- &payloadlen, &qos, &retained, &pyoptions))
- return NULL;
- if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
- return NULL;
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_send(c, destinationName, payloadlen, payload, qos, retained, &response);
- Py_END_ALLOW_THREADS
- if (rc == MQTTASYNC_SUCCESS && qos > 0)
- return Py_BuildValue("ii", rc, response);
- else
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_sendMessage(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- char* destinationName;
- PyObject *message, *temp;
- MQTTAsync_message msg = MQTTAsync_message_initializer;
- MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer;
- PyObject *pyoptions = NULL;
- int rc;
- if (!PyArg_ParseTuple(args, "ksO|O", &c, &destinationName, &message, &pyoptions))
- return NULL;
- if (!getResponseOptions(c, pyoptions, &response, PUBLISH))
- return NULL;
- if (!PyDict_Check(message))
- {
- PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary");
- return NULL;
- }
- if ((temp = PyDict_GetItemString(message, "payload")) == NULL)
- {
- PyErr_SetString(PyExc_TypeError, "dictionary must have payload key");
- return NULL;
- }
- if (PyString_Check(temp))
- PyString_AsStringAndSize(temp, (char**) &msg.payload,
- (Py_ssize_t*) &msg.payloadlen);
- else
- {
- PyErr_SetString(PyExc_TypeError, "payload value must be string");
- return NULL;
- }
- if ((temp = PyDict_GetItemString(message, "qos")) == NULL)
- msg.qos = (int) PyInt_AsLong(temp);
- if ((temp = PyDict_GetItemString(message, "retained")) == NULL)
- msg.retained = (int) PyInt_AsLong(temp);
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_sendMessage(c, destinationName, &msg, &response);
- Py_END_ALLOW_THREADS
- if (rc == MQTTASYNC_SUCCESS && msg.qos > 0)
- return Py_BuildValue("ii", rc, response);
- else
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- unsigned long timeout = 1000L;
- MQTTAsync_token dt;
- int rc;
- if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout))
- return NULL;
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_waitForCompletion(c, dt, timeout);
- Py_END_ALLOW_THREADS
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_getPendingTokens(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- MQTTAsync_token* tokens;
- int rc;
- if (!PyArg_ParseTuple(args, "k", &c))
- return NULL;
- Py_BEGIN_ALLOW_THREADS
- rc = MQTTAsync_getPendingTokens(c, &tokens);
- Py_END_ALLOW_THREADS
- if (rc == MQTTASYNC_SUCCESS)
- {
- int i = 0;
- PyObject* dts = PyList_New(0);
- while (tokens[i] != -1)
- PyList_Append(dts, PyInt_FromLong((long) tokens[i]));
- return Py_BuildValue("iO", rc, dts);
- }
- else
- return Py_BuildValue("i", rc);
- }
- static PyObject* mqttv3_destroy(PyObject* self, PyObject *args)
- {
- MQTTAsync c;
- ListElement* temp = NULL;
- if (!PyArg_ParseTuple(args, "k", &c))
- return NULL;
- if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL)
- {
- ListDetach(callbacks, temp->content);
- free(temp->content);
- }
- if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL)
- {
- ListDetach(connected_callbacks, temp->content);
- free(temp->content);
- }
- MQTTAsync_destroy(&c);
- Py_INCREF(Py_None);
- return Py_None;
- }
- static PyMethodDef MqttV3Methods[] =
- {
- { "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." },
- { "setcallbacks", mqttv3_setcallbacks, METH_VARARGS,
- "Sets the callback functions for a particular client." },
- { "setconnected", mqttv3_setconnected, METH_VARARGS,
- "Sets the connected callback function for a particular client." },
- { "connect", mqttv3_connect, METH_VARARGS,
- "Connects to a server using the specified options." },
- { "disconnect", mqttv3_disconnect, METH_VARARGS,
- "Disconnects from a server." },
- { "isConnected", mqttv3_isConnected, METH_VARARGS,
- "Determines if this client is currently connected to the server." },
- { "subscribe", mqttv3_subscribe, METH_VARARGS,
- "Subscribe to the given topic." },
- { "subscribeMany", mqttv3_subscribeMany, METH_VARARGS,
- "Subscribe to the given topics." },
- { "unsubscribe", mqttv3_unsubscribe, METH_VARARGS,
- "Unsubscribe from the given topic." },
- { "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS,
- "Unsubscribe from the given topics." },
- { "send", mqttv3_send, METH_VARARGS,
- "Publish a message to the given topic." },
- { "sendMessage", mqttv3_sendMessage, METH_VARARGS,
- "Publish a message to the given topic." },
- { "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS,
- "Waits for the completion of the delivery of the message represented by a delivery token." },
- { "getPendingTokens", mqttv3_getPendingTokens, METH_VARARGS,
- "Returns the tokens pending of completion." },
- { "destroy", mqttv3_destroy, METH_VARARGS,
- "Free memory allocated to a MQTT client. It is the opposite to create." },
- { NULL, NULL, 0, NULL } /* Sentinel */
- };
- PyMODINIT_FUNC initpaho_mqtt3a(void)
- {
- PyObject *m;
- PyEval_InitThreads();
- callbacks = ListInitialize();
- connected_callbacks = ListInitialize();
- m = Py_InitModule("paho_mqtt3a", MqttV3Methods);
- if (m == NULL)
- return;
- MqttV3Error = PyErr_NewException("paho_mqtt3a.error", NULL, NULL);
- Py_INCREF(MqttV3Error);
- PyModule_AddObject(m, "error", MqttV3Error);
- PyModule_AddIntConstant(m, "SUCCESS", MQTTASYNC_SUCCESS);
- PyModule_AddIntConstant(m, "FAILURE", MQTTASYNC_FAILURE);
- PyModule_AddIntConstant(m, "DISCONNECTED", MQTTASYNC_DISCONNECTED);
- PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTASYNC_MAX_MESSAGES_INFLIGHT);
- PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTASYNC_BAD_UTF8_STRING);
- PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTASYNC_NULL_PARAMETER);
- PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTASYNC_TOPICNAME_TRUNCATED);
- PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT);
- PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE);
- PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER);
- PyModule_AddIntConstant(m, "PERSISTENCE_ERROR",
- MQTTCLIENT_PERSISTENCE_ERROR);
- }
|