123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120 |
- /*******************************************************************************
- * Copyright (c) 2009, 2022 IBM Corp., Ian Craggs and others
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v2.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * https://www.eclipse.org/legal/epl-2.0/
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Ian Craggs - initial API and implementation and/or initial documentation
- * Ian Craggs - bug 384016 - segv setting will message
- * Ian Craggs - bug 384053 - v1.0.0.7 - stop MQTTClient_receive on socket error
- * Ian Craggs, Allan Stockdill-Mander - add ability to connect with SSL
- * Ian Craggs - multiple server connection support
- * Ian Craggs - fix for bug 413429 - connectionLost not called
- * Ian Craggs - fix for bug 421103 - trying to write to same socket, in publish/retries
- * Ian Craggs - fix for bug 419233 - mutexes not reporting errors
- * Ian Craggs - fix for bug 420851
- * Ian Craggs - fix for bug 432903 - queue persistence
- * Ian Craggs - MQTT 3.1.1 support
- * Ian Craggs - fix for bug 438176 - MQTT version selection
- * Rong Xiang, Ian Craggs - C++ compatibility
- * Ian Craggs - fix for bug 443724 - stack corruption
- * Ian Craggs - fix for bug 447672 - simultaneous access to socket structure
- * Ian Craggs - fix for bug 459791 - deadlock in WaitForCompletion for bad client
- * Ian Craggs - fix for bug 474905 - insufficient synchronization for subscribe, unsubscribe, connect
- * Ian Craggs - make it clear that yield and receive are not intended for multi-threaded mode (bug 474748)
- * Ian Craggs - SNI support, message queue unpersist bug
- * Ian Craggs - binary will message support
- * Ian Craggs - waitforCompletion fix #240
- * Ian Craggs - check for NULL SSL options #334
- * Ian Craggs - allocate username/password buffers #431
- * Ian Craggs - MQTT 5.0 support
- * Sven Gambel - add generic proxy support
- *******************************************************************************/
- /**
- * @file
- * \brief Synchronous API implementation
- *
- */
- #include <stdlib.h>
- #include <string.h>
- #if !defined(_WIN32) && !defined(_WIN64)
- #include <sys/time.h>
- #else
- #if defined(_MSC_VER) && _MSC_VER < 1900
- #define snprintf _snprintf
- #endif
- #endif
- #include "MQTTClient.h"
- #if !defined(NO_PERSISTENCE)
- #include "MQTTPersistence.h"
- #endif
- #include "utf-8.h"
- #include "MQTTProtocol.h"
- #include "MQTTProtocolOut.h"
- #include "Thread.h"
- #include "SocketBuffer.h"
- #include "StackTrace.h"
- #include "Heap.h"
- #if defined(OPENSSL)
- #include <openssl/ssl.h>
- #else
- #define URI_SSL "ssl://"
- #define URI_MQTTS "mqtts://"
- #endif
- #include "OsWrapper.h"
- #define URI_TCP "tcp://"
- #define URI_MQTT "mqtt://"
- #define URI_WS "ws://"
- #define URI_WSS "wss://"
- #include "VersionInfo.h"
- #include "WebSocket.h"
- #include "Proxy.h"
- const char *client_timestamp_eye = "MQTTClientV3_Timestamp " BUILD_TIMESTAMP;
- const char *client_version_eye = "MQTTClientV3_Version " CLIENT_VERSION;
- struct conlost_sync_data {
- sem_type sem;
- void *m;
- };
- int MQTTClient_init(void);
- void MQTTClient_global_init(MQTTClient_init_options* inits)
- {
- MQTTClient_init();
- #if defined(OPENSSL)
- SSLSocket_handleOpensslInit(inits->do_openssl_init);
- #endif
- }
- static ClientStates ClientState =
- {
- CLIENT_VERSION, /* version */
- NULL /* client list */
- };
- ClientStates* bstate = &ClientState;
- MQTTProtocol state;
- #if defined(_WIN32) || defined(_WIN64)
- static mutex_type mqttclient_mutex = NULL;
- mutex_type socket_mutex = NULL;
- static mutex_type subscribe_mutex = NULL;
- static mutex_type unsubscribe_mutex = NULL;
- static mutex_type connect_mutex = NULL;
- #if !defined(NO_HEAP_TRACKING)
- extern mutex_type stack_mutex;
- extern mutex_type heap_mutex;
- #endif
- extern mutex_type log_mutex;
- int MQTTClient_init(void)
- {
- DWORD rc = 0;
- if (mqttclient_mutex == NULL)
- {
- if ((mqttclient_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("mqttclient_mutex error %d\n", rc);
- goto exit;
- }
- if ((subscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("subscribe_mutex error %d\n", rc);
- goto exit;
- }
- if ((unsubscribe_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("unsubscribe_mutex error %d\n", rc);
- goto exit;
- }
- if ((connect_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("connect_mutex error %d\n", rc);
- goto exit;
- }
- #if !defined(NO_HEAP_TRACKING)
- if ((stack_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("stack_mutex error %d\n", rc);
- goto exit;
- }
- if ((heap_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("heap_mutex error %d\n", rc);
- goto exit;
- }
- #endif
- if ((log_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("log_mutex error %d\n", rc);
- goto exit;
- }
- if ((socket_mutex = CreateMutex(NULL, 0, NULL)) == NULL)
- {
- rc = GetLastError();
- printf("socket_mutex error %d\n", rc);
- goto exit;
- }
- }
- exit:
- return rc;
- }
- void MQTTClient_cleanup(void)
- {
- if (connect_mutex)
- CloseHandle(connect_mutex);
- if (subscribe_mutex)
- CloseHandle(subscribe_mutex);
- if (unsubscribe_mutex)
- CloseHandle(unsubscribe_mutex);
- #if !defined(NO_HEAP_TRACKING)
- if (stack_mutex)
- CloseHandle(stack_mutex);
- if (heap_mutex)
- CloseHandle(heap_mutex);
- #endif
- if (log_mutex)
- CloseHandle(log_mutex);
- if (socket_mutex)
- CloseHandle(socket_mutex);
- if (mqttclient_mutex)
- CloseHandle(mqttclient_mutex);
- }
- #if defined(PAHO_MQTT_STATIC)
- /* Global variable for one-time initialization structure */
- static INIT_ONCE g_InitOnce = INIT_ONCE_STATIC_INIT; /* Static initialization */
- /* One time initialization function */
- BOOL CALLBACK InitOnceFunction (
- PINIT_ONCE InitOnce, /* Pointer to one-time initialization structure */
- PVOID Parameter, /* Optional parameter passed by InitOnceExecuteOnce */
- PVOID *lpContext) /* Receives pointer to event object */
- {
- int rc = MQTTClient_init();
- return rc == 0;
- }
- #else
- BOOL APIENTRY DllMain(HANDLE hModule,
- DWORD ul_reason_for_call,
- LPVOID lpReserved)
- {
- switch (ul_reason_for_call)
- {
- case DLL_PROCESS_ATTACH:
- MQTTClient_init();
- break;
- case DLL_THREAD_ATTACH:
- break;
- case DLL_THREAD_DETACH:
- break;
- case DLL_PROCESS_DETACH:
- if (lpReserved)
- MQTTClient_cleanup();
- break;
- }
- return TRUE;
- }
- #endif
- #else
- static pthread_mutex_t mqttclient_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type mqttclient_mutex = &mqttclient_mutex_store;
- static pthread_mutex_t socket_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- mutex_type socket_mutex = &socket_mutex_store;
- static pthread_mutex_t subscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type subscribe_mutex = &subscribe_mutex_store;
- static pthread_mutex_t unsubscribe_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type unsubscribe_mutex = &unsubscribe_mutex_store;
- static pthread_mutex_t connect_mutex_store = PTHREAD_MUTEX_INITIALIZER;
- static mutex_type connect_mutex = &connect_mutex_store;
- int MQTTClient_init(void)
- {
- pthread_mutexattr_t attr;
- int rc;
- pthread_mutexattr_init(&attr);
- #if !defined(_WRS_KERNEL)
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
- #else
- /* #warning "no pthread_mutexattr_settype" */
- #endif /* !defined(_WRS_KERNEL) */
- if ((rc = pthread_mutex_init(mqttclient_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing client_mutex\n", rc);
- else if ((rc = pthread_mutex_init(socket_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing socket_mutex\n", rc);
- else if ((rc = pthread_mutex_init(subscribe_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing subscribe_mutex\n", rc);
- else if ((rc = pthread_mutex_init(unsubscribe_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing unsubscribe_mutex\n", rc);
- else if ((rc = pthread_mutex_init(connect_mutex, &attr)) != 0)
- printf("MQTTClient: error %d initializing connect_mutex\n", rc);
- return rc;
- }
- #define WINAPI
- #endif
- static volatile int library_initialized = 0;
- static List* handles = NULL;
- static int running = 0;
- static int tostop = 0;
- static thread_id_type run_id = 0;
- typedef struct
- {
- MQTTClient_message* msg;
- char* topicName;
- int topicLen;
- unsigned int seqno; /* only used on restore */
- } qEntry;
- typedef struct
- {
- char* serverURI;
- const char* currentServerURI; /* when using HA options, set the currently used serverURI */
- #if defined(OPENSSL)
- int ssl;
- #endif
- int websocket;
- Clients* c;
- MQTTClient_connectionLost* cl;
- MQTTClient_messageArrived* ma;
- MQTTClient_deliveryComplete* dc;
- void* context;
- MQTTClient_disconnected* disconnected;
- void* disconnected_context; /* the context to be associated with the disconnected callback*/
- MQTTClient_published* published;
- void* published_context; /* the context to be associated with the disconnected callback*/
- #if 0
- MQTTClient_authHandle* auth_handle;
- void* auth_handle_context; /* the context to be associated with the authHandle callback*/
- #endif
- sem_type connect_sem;
- int rc; /* getsockopt return code in connect */
- sem_type connack_sem;
- sem_type suback_sem;
- sem_type unsuback_sem;
- MQTTPacket* pack;
- unsigned long commandTimeout;
- } MQTTClients;
- struct props_rc_parms
- {
- MQTTClients* m;
- MQTTProperties* properties;
- enum MQTTReasonCodes reasonCode;
- };
- static void MQTTClient_terminate(void);
- static void MQTTClient_emptyMessageQueue(Clients* client);
- static int MQTTClient_deliverMessage(
- int rc, MQTTClients* m,
- char** topicName, int* topicLen,
- MQTTClient_message** message);
- static int clientSockCompare(void* a, void* b);
- static thread_return_type WINAPI connectionLost_call(void* context);
- static thread_return_type WINAPI MQTTClient_run(void* n);
- static int MQTTClient_stop(void);
- static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props);
- static int MQTTClient_cleanSession(Clients* client);
- static MQTTResponse MQTTClient_connectURIVersion(
- MQTTClient handle, MQTTClient_connectOptions* options,
- const char* serverURI, int MQTTVersion,
- START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout,
- MQTTProperties* connectProperties, MQTTProperties* willProperties);
- static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
- MQTTProperties* connectProperties, MQTTProperties* willProperties);
- static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop, enum MQTTReasonCodes, MQTTProperties*);
- static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
- static void MQTTClient_retry(void);
- static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc);
- static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout);
- /*static int pubCompare(void* a, void* b); */
- static void MQTTProtocol_checkPendingWrites(void);
- static void MQTTClient_writeComplete(SOCKET socket, int rc);
- static void MQTTClient_writeContinue(SOCKET socket);
- int MQTTClient_createWithOptions(MQTTClient* handle, const char* serverURI, const char* clientId,
- int persistence_type, void* persistence_context, MQTTClient_createOptions* options)
- {
- int rc = 0;
- MQTTClients *m = NULL;
- #if (defined(_WIN32) || defined(_WIN64)) && defined(PAHO_MQTT_STATIC)
- /* intializes mutexes once. Must come before FUNC_ENTRY */
- BOOL bStatus = InitOnceExecuteOnce(&g_InitOnce, InitOnceFunction, NULL, NULL);
- #endif
- FUNC_ENTRY;
- if ((rc = Thread_lock_mutex(mqttclient_mutex)) != 0)
- goto nounlock_exit;
- if (serverURI == NULL || clientId == NULL)
- {
- rc = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- if (!UTF8_validateString(clientId))
- {
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- if (strlen(clientId) == 0 && persistence_type == MQTTCLIENT_PERSISTENCE_DEFAULT)
- {
- rc = MQTTCLIENT_PERSISTENCE_ERROR;
- goto exit;
- }
- if (strstr(serverURI, "://") != NULL)
- {
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) != 0
- && strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) != 0
- && strncmp(URI_WS, serverURI, strlen(URI_WS)) != 0
- #if defined(OPENSSL)
- && strncmp(URI_SSL, serverURI, strlen(URI_SSL)) != 0
- && strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) != 0
- && strncmp(URI_WSS, serverURI, strlen(URI_WSS)) != 0
- #endif
- )
- {
- rc = MQTTCLIENT_BAD_PROTOCOL;
- goto exit;
- }
- }
- if (options && (strncmp(options->struct_id, "MQCO", 4) != 0 || options->struct_version != 0))
- {
- rc = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- if (!library_initialized)
- {
- #if !defined(NO_HEAP_TRACKING)
- Heap_initialize();
- #endif
- Log_initialize((Log_nameValue*)MQTTClient_getVersionInfo());
- bstate->clients = ListInitialize();
- Socket_outInitialize();
- Socket_setWriteCompleteCallback(MQTTClient_writeComplete);
- Socket_setWriteContinueCallback(MQTTClient_writeContinue);
- Socket_setWriteAvailableCallback(MQTTProtocol_writeAvailable);
- handles = ListInitialize();
- #if defined(OPENSSL)
- SSLSocket_initialize();
- #endif
- library_initialized = 1;
- }
- if ((m = malloc(sizeof(MQTTClients))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- *handle = m;
- memset(m, '\0', sizeof(MQTTClients));
- m->commandTimeout = 10000L;
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
- serverURI += strlen(URI_TCP);
- else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
- serverURI += strlen(URI_MQTT);
- else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
- {
- serverURI += strlen(URI_WS);
- m->websocket = 1;
- }
- else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
- {
- #if defined(OPENSSL)
- serverURI += strlen(URI_SSL);
- m->ssl = 1;
- #else
- rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
- goto exit;
- #endif
- }
- else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
- {
- #if defined(OPENSSL)
- serverURI += strlen(URI_MQTTS);
- m->ssl = 1;
- #else
- rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
- goto exit;
- #endif
- }
- else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
- {
- #if defined(OPENSSL)
- serverURI += strlen(URI_WSS);
- m->ssl = 1;
- m->websocket = 1;
- #else
- rc = MQTTCLIENT_SSL_NOT_SUPPORTED;
- goto exit;
- #endif
- }
- m->serverURI = MQTTStrdup(serverURI);
- ListAppend(handles, m, sizeof(MQTTClients));
- if ((m->c = malloc(sizeof(Clients))) == NULL)
- {
- ListRemove(handles, m);
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(m->c, '\0', sizeof(Clients));
- m->c->context = m;
- m->c->MQTTVersion = (options) ? options->MQTTVersion : MQTTVERSION_DEFAULT;
- m->c->outboundMsgs = ListInitialize();
- m->c->inboundMsgs = ListInitialize();
- m->c->messageQueue = ListInitialize();
- m->c->outboundQueue = ListInitialize();
- m->c->clientID = MQTTStrdup(clientId);
- m->connect_sem = Thread_create_sem(&rc);
- m->connack_sem = Thread_create_sem(&rc);
- m->suback_sem = Thread_create_sem(&rc);
- m->unsuback_sem = Thread_create_sem(&rc);
- #if !defined(NO_PERSISTENCE)
- rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
- if (rc == 0)
- {
- rc = MQTTPersistence_initialize(m->c, m->serverURI);
- if (rc == 0)
- MQTTPersistence_restoreMessageQueue(m->c);
- }
- #endif
- ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- nounlock_exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTClient_create(MQTTClient* handle, const char* serverURI, const char* clientId,
- int persistence_type, void* persistence_context)
- {
- return MQTTClient_createWithOptions(handle, serverURI, clientId, persistence_type,
- persistence_context, NULL);
- }
- static void MQTTClient_terminate(void)
- {
- FUNC_ENTRY;
- MQTTClient_stop();
- if (library_initialized)
- {
- ListFree(bstate->clients);
- ListFree(handles);
- handles = NULL;
- WebSocket_terminate();
- #if !defined(NO_HEAP_TRACKING)
- Heap_terminate();
- #endif
- Log_terminate();
- library_initialized = 0;
- }
- FUNC_EXIT;
- }
- static void MQTTClient_emptyMessageQueue(Clients* client)
- {
- FUNC_ENTRY;
- /* empty message queue */
- if (client->messageQueue->count > 0)
- {
- ListElement* current = NULL;
- while (ListNextElement(client->messageQueue, ¤t))
- {
- qEntry* qe = (qEntry*)(current->content);
- free(qe->topicName);
- MQTTProperties_free(&qe->msg->properties);
- free(qe->msg->payload);
- free(qe->msg);
- }
- ListEmpty(client->messageQueue);
- }
- FUNC_EXIT;
- }
- void MQTTClient_destroy(MQTTClient* handle)
- {
- MQTTClients* m = *handle;
- FUNC_ENTRY;
- Thread_lock_mutex(connect_mutex);
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL)
- goto exit;
- if (m->c)
- {
- SOCKET saved_socket = m->c->net.socket;
- char* saved_clientid = MQTTStrdup(m->c->clientID);
- #if !defined(NO_PERSISTENCE)
- MQTTPersistence_close(m->c);
- #endif
- MQTTClient_emptyMessageQueue(m->c);
- MQTTProtocol_freeClient(m->c);
- if (!ListRemove(bstate->clients, m->c))
- Log(LOG_ERROR, 0, NULL);
- else
- Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
- free(saved_clientid);
- }
- if (m->serverURI)
- free(m->serverURI);
- Thread_destroy_sem(m->connect_sem);
- Thread_destroy_sem(m->connack_sem);
- Thread_destroy_sem(m->suback_sem);
- Thread_destroy_sem(m->unsuback_sem);
- if (!ListRemove(handles, m))
- Log(LOG_ERROR, -1, "free error");
- *handle = NULL;
- if (bstate->clients->count == 0)
- MQTTClient_terminate();
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- Thread_unlock_mutex(connect_mutex);
- FUNC_EXIT;
- }
- void MQTTClient_freeMessage(MQTTClient_message** message)
- {
- FUNC_ENTRY;
- MQTTProperties_free(&(*message)->properties);
- free((*message)->payload);
- free(*message);
- *message = NULL;
- FUNC_EXIT;
- }
- void MQTTClient_free(void* memory)
- {
- FUNC_ENTRY;
- free(memory);
- FUNC_EXIT;
- }
- void MQTTResponse_free(MQTTResponse response)
- {
- FUNC_ENTRY;
- if (response.reasonCodeCount > 0 && response.reasonCodes)
- free(response.reasonCodes);
- if (response.properties)
- {
- MQTTProperties_free(response.properties);
- free(response.properties);
- }
- FUNC_EXIT;
- }
- static int MQTTClient_deliverMessage(int rc, MQTTClients* m, char** topicName, int* topicLen, MQTTClient_message** message)
- {
- qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
- FUNC_ENTRY;
- *message = qe->msg;
- *topicName = qe->topicName;
- *topicLen = qe->topicLen;
- if (strlen(*topicName) != *topicLen)
- rc = MQTTCLIENT_TOPICNAME_TRUNCATED;
- #if !defined(NO_PERSISTENCE)
- if (m->c->persistence)
- MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
- #endif
- ListRemove(m->c->messageQueue, m->c->messageQueue->first->content);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * List callback function for comparing clients by socket
- * @param a first integer value
- * @param b second integer value
- * @return boolean indicating whether a and b are equal
- */
- static int clientSockCompare(void* a, void* b)
- {
- MQTTClients* m = (MQTTClients*)a;
- return m->c->net.socket == *(int*)b;
- }
- /**
- * Wrapper function to call connection lost on a separate thread. A separate thread is needed to allow the
- * connectionLost function to make API calls (e.g. connect)
- * @param context a pointer to the relevant client
- * @return thread_return_type standard thread return value - not used here
- */
- static thread_return_type WINAPI connectionLost_call(void* context)
- {
- struct conlost_sync_data *data = (struct conlost_sync_data *)context;
- MQTTClients* m = (MQTTClients *)data->m;
- (*(m->cl))(m->context, NULL);
- Thread_post_sem(data->sem);
- return 0;
- }
- int MQTTClient_setDisconnected(MQTTClient handle, void* context, MQTTClient_disconnected* disconnected)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->disconnected_context = context;
- m->disconnected = disconnected;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * Wrapper function to call disconnected on a separate thread. A separate thread is needed to allow the
- * disconnected function to make API calls (e.g. connect)
- * @param context a pointer to the relevant client
- * @return thread_return_type standard thread return value - not used here
- */
- static thread_return_type WINAPI call_disconnected(void* context)
- {
- struct props_rc_parms* pr = (struct props_rc_parms*)context;
- (*(pr->m->disconnected))(pr->m->disconnected_context, pr->properties, pr->reasonCode);
- MQTTProperties_free(pr->properties);
- free(pr->properties);
- free(pr);
- return 0;
- }
- int MQTTClient_setPublished(MQTTClient handle, void* context, MQTTClient_published* published)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->published_context = context;
- m->published = published;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- #if 0
- int MQTTClient_setHandleAuth(MQTTClient handle, void* context, MQTTClient_handleAuth* auth_handle)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->auth_handle_context = context;
- m->auth_handle = auth_handle;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * Wrapper function to call authHandle on a separate thread. A separate thread is needed to allow the
- * disconnected function to make API calls (e.g. MQTTClient_auth)
- * @param context a pointer to the relevant client
- * @return thread_return_type standard thread return value - not used here
- */
- static thread_return_type WINAPI call_auth_handle(void* context)
- {
- struct props_rc_parms* pr = (struct props_rc_parms*)context;
- (*(pr->m->auth_handle))(pr->m->auth_handle_context, pr->properties, pr->reasonCode);
- MQTTProperties_free(pr->properties);
- free(pr->properties);
- free(pr);
- return 0;
- }
- #endif
- /* This is the thread function that handles the calling of callback functions if set */
- static thread_return_type WINAPI MQTTClient_run(void* n)
- {
- long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
- FUNC_ENTRY;
- Thread_set_name("MQTTClient_run");
- Thread_lock_mutex(mqttclient_mutex);
- run_id = Thread_getid();
- running = 1;
- while (!tostop)
- {
- int rc = SOCKET_ERROR;
- SOCKET sock = -1;
- MQTTClients* m = NULL;
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_cycle(&sock, timeout, &rc);
- Thread_lock_mutex(mqttclient_mutex);
- if (tostop)
- break;
- timeout = 100L;
- /* find client corresponding to socket */
- if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
- {
- /* assert: should not happen */
- continue;
- }
- m = (MQTTClient)(handles->current->content);
- if (m == NULL)
- {
- /* assert: should not happen */
- continue;
- }
- if (rc == SOCKET_ERROR)
- {
- if (m->c->connected)
- MQTTClient_disconnect_internal(m, 0);
- else
- {
- if (m->c->connect_state == SSL_IN_PROGRESS)
- {
- Log(TRACE_MIN, -1, "Posting connect semaphore for client %s", m->c->clientID);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connect_sem);
- }
- if (m->c->connect_state == WAIT_FOR_CONNACK)
- {
- Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connack_sem);
- }
- }
- }
- else
- {
- if (m->c->messageQueue->count > 0 && m->ma)
- {
- qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
- int topicLen = qe->topicLen;
- if (strlen(qe->topicName) == topicLen)
- topicLen = 0;
- Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
- m->c->clientID, m->c->messageQueue->count);
- Thread_unlock_mutex(mqttclient_mutex);
- rc = (*(m->ma))(m->context, qe->topicName, topicLen, qe->msg);
- Thread_lock_mutex(mqttclient_mutex);
- /* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
- * the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
- * so we must be careful how we use it.
- */
- if (rc)
- {
- #if !defined(NO_PERSISTENCE)
- if (m->c->persistence)
- MQTTPersistence_unpersistQueueEntry(m->c, (MQTTPersistence_qEntry*)qe);
- #endif
- ListRemove(m->c->messageQueue, qe);
- }
- else
- Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
- m->c->clientID);
- }
- if (pack)
- {
- if (pack->header.bits.type == CONNACK)
- {
- Log(TRACE_MIN, -1, "Posting connack semaphore for client %s", m->c->clientID);
- m->pack = pack;
- Thread_post_sem(m->connack_sem);
- }
- else if (pack->header.bits.type == SUBACK)
- {
- Log(TRACE_MIN, -1, "Posting suback semaphore for client %s", m->c->clientID);
- m->pack = pack;
- Thread_post_sem(m->suback_sem);
- }
- else if (pack->header.bits.type == UNSUBACK)
- {
- Log(TRACE_MIN, -1, "Posting unsuback semaphore for client %s", m->c->clientID);
- m->pack = pack;
- Thread_post_sem(m->unsuback_sem);
- }
- else if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- if (pack->header.bits.type == DISCONNECT && m->disconnected)
- {
- struct props_rc_parms* dp;
- Ack* disc = (Ack*)pack;
- dp = malloc(sizeof(struct props_rc_parms));
- if (dp)
- {
- dp->m = m;
- dp->reasonCode = disc->rc;
- dp->properties = malloc(sizeof(MQTTProperties));
- if (dp->properties)
- {
- *(dp->properties) = disc->properties;
- MQTTClient_disconnect1(m, 10, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
- Log(TRACE_MIN, -1, "Calling disconnected for client %s", m->c->clientID);
- Thread_start(call_disconnected, dp);
- }
- else
- free(dp);
- }
- free(disc);
- }
- #if 0
- if (pack->header.bits.type == AUTH && m->auth_handle)
- {
- struct props_rc_parms dp;
- Ack* disc = (Ack*)pack;
- dp.m = m;
- dp.properties = &disc->properties;
- dp.reasonCode = disc->rc;
- free(pack);
- Log(TRACE_MIN, -1, "Calling auth_handle for client %s", m->c->clientID);
- Thread_start(call_auth_handle, &dp);
- }
- #endif
- }
- }
- else if (m->c->connect_state == TCP_IN_PROGRESS)
- {
- int error;
- socklen_t len = sizeof(error);
- if ((m->rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
- m->rc = error;
- Log(TRACE_MIN, -1, "Posting connect semaphore for client %s rc %d", m->c->clientID, m->rc);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connect_sem);
- }
- #if defined(OPENSSL)
- else if (m->c->connect_state == SSL_IN_PROGRESS)
- {
- rc = m->c->sslopts->struct_version >= 3 ?
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
- m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, m->serverURI,
- m->c->sslopts->verify, NULL, NULL);
- if (rc == 1 || rc == SSL_FATAL)
- {
- if (rc == 1 && (m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- m->rc = rc;
- Log(TRACE_MIN, -1, "Posting connect semaphore for SSL client %s rc %d", m->c->clientID, m->rc);
- m->c->connect_state = NOT_IN_PROGRESS;
- Thread_post_sem(m->connect_sem);
- }
- }
- #endif
- else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
- {
- if (rc != TCPSOCKET_INTERRUPTED)
- {
- Log(TRACE_MIN, -1, "Posting websocket handshake for client %s rc %d", m->c->clientID, m->rc);
- m->c->connect_state = WAIT_FOR_CONNACK;
- Thread_post_sem(m->connect_sem);
- }
- }
- }
- }
- run_id = 0;
- running = tostop = 0;
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT;
- #if defined(_WIN32) || defined(_WIN64)
- ExitThread(0);
- #endif
- return 0;
- }
- static int MQTTClient_stop(void)
- {
- int rc = 0;
- FUNC_ENTRY;
- if (running == 1 && tostop == 0)
- {
- int conn_count = 0;
- ListElement* current = NULL;
- if (handles != NULL)
- {
- /* find out how many handles are still connected */
- while (ListNextElement(handles, ¤t))
- {
- if (((MQTTClients*)(current->content))->c->connect_state > NOT_IN_PROGRESS ||
- ((MQTTClients*)(current->content))->c->connected)
- ++conn_count;
- }
- }
- Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
- /* stop the background thread, if we are the last one to be using it */
- if (conn_count == 0)
- {
- int count = 0;
- tostop = 1;
- if (Thread_getid() != run_id)
- {
- while (running && ++count < 100)
- {
- Thread_unlock_mutex(mqttclient_mutex);
- Log(TRACE_MIN, -1, "sleeping");
- MQTTTime_sleep(100L);
- Thread_lock_mutex(mqttclient_mutex);
- }
- }
- rc = 1;
- }
- }
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTClient_setCallbacks(MQTTClient handle, void* context, MQTTClient_connectionLost* cl,
- MQTTClient_messageArrived* ma, MQTTClient_deliveryComplete* dc)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || ma == NULL || m->c->connect_state != NOT_IN_PROGRESS)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- m->context = context;
- m->cl = cl;
- m->ma = ma;
- m->dc = dc;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- static void MQTTClient_closeSession(Clients* client, enum MQTTReasonCodes reason, MQTTProperties* props)
- {
- FUNC_ENTRY;
- client->good = 0;
- client->ping_outstanding = 0;
- client->ping_due = 0;
- if (client->net.socket > 0)
- {
- if (client->connected)
- MQTTPacket_send_disconnect(client, reason, props);
- Thread_lock_mutex(socket_mutex);
- WebSocket_close(&client->net, WebSocket_CLOSE_NORMAL, NULL);
- #if defined(OPENSSL)
- SSL_SESSION_free(client->session); /* is a no-op if session is NULL */
- client->session = NULL; /* show the session has been freed */
- SSLSocket_close(&client->net);
- #endif
- Socket_close(client->net.socket);
- Thread_unlock_mutex(socket_mutex);
- client->net.socket = 0;
- #if defined(OPENSSL)
- client->net.ssl = NULL;
- #endif
- }
- client->connected = 0;
- client->connect_state = NOT_IN_PROGRESS;
- if (client->MQTTVersion < MQTTVERSION_5 && client->cleansession)
- MQTTClient_cleanSession(client);
- FUNC_EXIT;
- }
- static int MQTTClient_cleanSession(Clients* client)
- {
- int rc = 0;
- FUNC_ENTRY;
- #if !defined(NO_PERSISTENCE)
- rc = MQTTPersistence_clear(client);
- #endif
- MQTTProtocol_emptyMessageList(client->inboundMsgs);
- MQTTProtocol_emptyMessageList(client->outboundMsgs);
- MQTTClient_emptyMessageQueue(client);
- client->msgID = 0;
- FUNC_EXIT_RC(rc);
- return rc;
- }
- void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload)
- {
- qEntry* qe = NULL;
- MQTTClient_message* mm = NULL;
- MQTTClient_message initialized = MQTTClient_message_initializer;
- FUNC_ENTRY;
- qe = malloc(sizeof(qEntry));
- if (!qe)
- goto exit;
- mm = malloc(sizeof(MQTTClient_message));
- if (!mm)
- {
- free(qe);
- goto exit;
- }
- memcpy(mm, &initialized, sizeof(MQTTClient_message));
- qe->msg = mm;
- qe->topicName = publish->topic;
- qe->topicLen = publish->topiclen;
- publish->topic = NULL;
- if (allocatePayload)
- {
- mm->payload = malloc(publish->payloadlen);
- if (mm->payload == NULL)
- {
- free(mm);
- free(qe);
- goto exit;
- }
- memcpy(mm->payload, publish->payload, publish->payloadlen);
- }
- else
- mm->payload = publish->payload;
- mm->payloadlen = publish->payloadlen;
- mm->qos = publish->header.bits.qos;
- mm->retained = publish->header.bits.retain;
- if (publish->header.bits.qos == 2)
- mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
- else
- mm->dup = publish->header.bits.dup;
- mm->msgid = publish->msgId;
- if (publish->MQTTVersion >= 5)
- mm->properties = MQTTProperties_copy(&publish->properties);
- ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
- #if !defined(NO_PERSISTENCE)
- if (client->persistence)
- MQTTPersistence_persistQueueEntry(client, (MQTTPersistence_qEntry*)qe);
- #endif
- exit:
- FUNC_EXIT;
- }
- static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
- START_TIME_TYPE start, ELAPSED_TIME_TYPE millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- int rc = SOCKET_ERROR;
- int sessionPresent = 0;
- MQTTResponse resp = MQTTResponse_initializer;
- FUNC_ENTRY;
- resp.reasonCode = SOCKET_ERROR;
- if (m->ma && !running)
- {
- int count = 0;
- Thread_start(MQTTClient_run, handle);
- if (MQTTTime_elapsed(start) >= millisecsTimeout)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- while (!running && ++count < 5)
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTTime_sleep(100L);
- Thread_lock_mutex(mqttclient_mutex);
- }
- if (!running)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
- #if defined(OPENSSL)
- #if defined(__GNUC__) && defined(__linux__)
- rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties,
- millisecsTimeout - MQTTTime_elapsed(start));
- #else
- rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, m->websocket, MQTTVersion, connectProperties, willProperties);
- #endif
- #else
- #if defined(__GNUC__) && defined(__linux__)
- rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties,
- millisecsTimeout - MQTTTime_elapsed(start));
- #else
- rc = MQTTProtocol_connect(serverURI, m->c, m->websocket, MQTTVersion, connectProperties, willProperties);
- #endif
- #endif
- if (rc == SOCKET_ERROR)
- goto exit;
- if (m->c->connect_state == NOT_IN_PROGRESS)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- if (m->c->connect_state == TCP_IN_PROGRESS) /* TCP connect started - wait for completion */
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- if (rc != 0)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- #if defined(OPENSSL)
- if (m->ssl)
- {
- int port1;
- size_t hostname_len;
- const char *topic;
- int setSocketForSSLrc = 0;
- if (m->c->net.https_proxy) {
- m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
- if ((rc = Proxy_connect( &m->c->net, 1, serverURI)) == SOCKET_ERROR )
- goto exit;
- }
- hostname_len = MQTTProtocol_addressPort(serverURI, &port1, &topic, MQTT_DEFAULT_PORT);
- setSocketForSSLrc = SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts,
- serverURI, hostname_len);
- if (setSocketForSSLrc != MQTTCLIENT_SUCCESS)
- {
- if (m->c->session != NULL)
- if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
- Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
- rc = m->c->sslopts->struct_version >= 3 ?
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
- m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
- SSLSocket_connect(m->c->net.ssl, m->c->net.socket, serverURI,
- m->c->sslopts->verify, NULL, NULL);
- if (rc == TCPSOCKET_INTERRUPTED)
- m->c->connect_state = SSL_IN_PROGRESS; /* the connect is still in progress */
- else if (rc == SSL_FATAL)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- else if (rc == 1)
- {
- if (m->websocket)
- {
- m->c->connect_state = WEBSOCKET_IN_PROGRESS;
- rc = WebSocket_connect(&m->c->net, 1, serverURI);
- if ( rc == SOCKET_ERROR )
- goto exit;
- }
- else
- {
- rc = MQTTCLIENT_SUCCESS;
- m->c->connect_state = WAIT_FOR_CONNACK;
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- }
- }
- }
- else
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- #endif
- else
- {
- if (m->c->net.http_proxy) {
- m->c->connect_state = PROXY_CONNECT_IN_PROGRESS;
- if ((rc = Proxy_connect( &m->c->net, 0, serverURI)) == SOCKET_ERROR )
- goto exit;
- }
- if (m->websocket)
- {
- m->c->connect_state = WEBSOCKET_IN_PROGRESS;
- if ( WebSocket_connect(&m->c->net, 0, serverURI) == SOCKET_ERROR )
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- else
- {
- m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- }
- }
- #if defined(OPENSSL)
- if (m->c->connect_state == SSL_IN_PROGRESS) /* SSL connect sent - wait for completion */
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- if (rc != 1)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- if((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- if ( m->websocket )
- {
- /* wait for websocket connect */
- m->c->connect_state = WEBSOCKET_IN_PROGRESS;
- rc = WebSocket_connect( &m->c->net, 1, serverURI);
- if ( rc != 1 )
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- else
- {
- m->c->connect_state = WAIT_FOR_CONNACK; /* TCP connect completed, in which case send the MQTT connect packet */
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- }
- #endif
- if (m->c->connect_state == WEBSOCKET_IN_PROGRESS) /* websocket request sent - wait for upgrade */
- {
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_waitfor(handle, CONNECT, &rc, millisecsTimeout - MQTTTime_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- m->c->connect_state = WAIT_FOR_CONNACK; /* websocket upgrade complete */
- if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
- {
- rc = SOCKET_ERROR;
- goto exit;
- }
- }
- if (m->c->connect_state == WAIT_FOR_CONNACK) /* MQTT connect sent - wait for CONNACK */
- {
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_waitfor(handle, CONNACK, &rc, millisecsTimeout - MQTTTime_elapsed(start));
- Thread_lock_mutex(mqttclient_mutex);
- if (pack == NULL)
- rc = SOCKET_ERROR;
- else
- {
- Connack* connack = (Connack*)pack;
- Log(TRACE_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
- if ((rc = connack->rc) == MQTTCLIENT_SUCCESS)
- {
- m->c->connected = 1;
- m->c->good = 1;
- m->c->connect_state = NOT_IN_PROGRESS;
- if (MQTTVersion == 4)
- sessionPresent = connack->flags.bits.sessionPresent;
- if (m->c->cleansession || m->c->cleanstart)
- rc = MQTTClient_cleanSession(m->c);
- if (m->c->outboundMsgs->count > 0)
- {
- ListElement* outcurrent = NULL;
- START_TIME_TYPE zero = START_TIME_ZERO;
- while (ListNextElement(m->c->outboundMsgs, &outcurrent))
- {
- Messages* m = (Messages*)(outcurrent->content);
- memset(&m->lastTouch, '\0', sizeof(m->lastTouch));
- }
- MQTTProtocol_retry(zero, 1, 1);
- if (m->c->connected != 1)
- rc = MQTTCLIENT_DISCONNECTED;
- }
- if (m->c->MQTTVersion == MQTTVERSION_5)
- {
- if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- *resp.properties = MQTTProperties_copy(&connack->properties);
- }
- }
- MQTTPacket_freeConnack(connack);
- m->pack = NULL;
- }
- }
- exit:
- if (rc == MQTTCLIENT_SUCCESS)
- {
- if (options->struct_version >= 4) /* means we have to fill out return values */
- {
- options->returned.serverURI = serverURI;
- options->returned.MQTTVersion = MQTTVersion;
- options->returned.sessionPresent = sessionPresent;
- }
- }
- else
- MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3), MQTTREASONCODE_SUCCESS, NULL); /* don't want to call connection lost */
- resp.reasonCode = rc;
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- static int retryLoopIntervalms = 5000;
- void setRetryLoopInterval(int keepalive)
- {
- retryLoopIntervalms = (keepalive*1000) / 10;
- if (retryLoopIntervalms < 100)
- retryLoopIntervalms = 100;
- else if (retryLoopIntervalms > 5000)
- retryLoopIntervalms = 5000;
- }
- static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
- MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- START_TIME_TYPE start;
- ELAPSED_TIME_TYPE millisecsTimeout = 30000L;
- MQTTResponse rc = MQTTResponse_initializer;
- int MQTTVersion = 0;
- FUNC_ENTRY;
- rc.reasonCode = SOCKET_ERROR;
- millisecsTimeout = options->connectTimeout * 1000;
- start = MQTTTime_start_clock();
- m->currentServerURI = serverURI;
- m->c->keepAliveInterval = options->keepAliveInterval;
- m->c->retryInterval = options->retryInterval;
- setRetryLoopInterval(options->keepAliveInterval);
- m->c->MQTTVersion = options->MQTTVersion;
- m->c->cleanstart = m->c->cleansession = 0;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- m->c->cleanstart = options->cleanstart;
- else
- m->c->cleansession = options->cleansession;
- m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
- if (options->struct_version >= 6)
- {
- if (options->maxInflightMessages > 0)
- m->c->maxInflightMessages = options->maxInflightMessages;
- }
- if (options->struct_version >= 7)
- {
- m->c->net.httpHeaders = options->httpHeaders;
- }
- if (options->struct_version >= 8)
- {
- if (options->httpProxy)
- m->c->httpProxy = MQTTStrdup(options->httpProxy);
- if (options->httpsProxy)
- m->c->httpsProxy = MQTTStrdup(options->httpsProxy);
- }
- if (m->c->will)
- {
- free(m->c->will->payload);
- free(m->c->will->topic);
- free(m->c->will);
- m->c->will = NULL;
- }
- if (options->will && (options->will->struct_version == 0 || options->will->struct_version == 1))
- {
- const void* source = NULL;
- if ((m->c->will = malloc(sizeof(willMessages))) == NULL)
- {
- rc.reasonCode = PAHO_MEMORY_ERROR;
- goto exit;
- }
- if (options->will->message || (options->will->struct_version == 1 && options->will->payload.data))
- {
- if (options->will->struct_version == 1 && options->will->payload.data)
- {
- m->c->will->payloadlen = options->will->payload.len;
- source = options->will->payload.data;
- }
- else
- {
- m->c->will->payloadlen = (int)strlen(options->will->message);
- source = (void*)options->will->message;
- }
- if ((m->c->will->payload = malloc(m->c->will->payloadlen)) == NULL)
- {
- free(m->c->will);
- rc.reasonCode = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memcpy(m->c->will->payload, source, m->c->will->payloadlen);
- }
- else
- {
- m->c->will->payload = NULL;
- m->c->will->payloadlen = 0;
- }
- m->c->will->qos = options->will->qos;
- m->c->will->retained = options->will->retained;
- m->c->will->topic = MQTTStrdup(options->will->topicName);
- }
- #if defined(OPENSSL)
- if (m->c->sslopts)
- {
- if (m->c->sslopts->trustStore)
- free((void*)m->c->sslopts->trustStore);
- if (m->c->sslopts->keyStore)
- free((void*)m->c->sslopts->keyStore);
- if (m->c->sslopts->privateKey)
- free((void*)m->c->sslopts->privateKey);
- if (m->c->sslopts->privateKeyPassword)
- free((void*)m->c->sslopts->privateKeyPassword);
- if (m->c->sslopts->enabledCipherSuites)
- free((void*)m->c->sslopts->enabledCipherSuites);
- if (m->c->sslopts->struct_version >= 2)
- {
- if (m->c->sslopts->CApath)
- free((void*)m->c->sslopts->CApath);
- }
- free(m->c->sslopts);
- m->c->sslopts = NULL;
- }
- if (options->struct_version != 0 && options->ssl)
- {
- if ((m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions))) == NULL)
- {
- rc.reasonCode = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memset(m->c->sslopts, '\0', sizeof(MQTTClient_SSLOptions));
- m->c->sslopts->struct_version = options->ssl->struct_version;
- if (options->ssl->trustStore)
- m->c->sslopts->trustStore = MQTTStrdup(options->ssl->trustStore);
- if (options->ssl->keyStore)
- m->c->sslopts->keyStore = MQTTStrdup(options->ssl->keyStore);
- if (options->ssl->privateKey)
- m->c->sslopts->privateKey = MQTTStrdup(options->ssl->privateKey);
- if (options->ssl->privateKeyPassword)
- m->c->sslopts->privateKeyPassword = MQTTStrdup(options->ssl->privateKeyPassword);
- if (options->ssl->enabledCipherSuites)
- m->c->sslopts->enabledCipherSuites = MQTTStrdup(options->ssl->enabledCipherSuites);
- m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
- if (m->c->sslopts->struct_version >= 1)
- m->c->sslopts->sslVersion = options->ssl->sslVersion;
- if (m->c->sslopts->struct_version >= 2)
- {
- m->c->sslopts->verify = options->ssl->verify;
- if (options->ssl->CApath)
- m->c->sslopts->CApath = MQTTStrdup(options->ssl->CApath);
- }
- if (m->c->sslopts->struct_version >= 3)
- {
- m->c->sslopts->ssl_error_cb = options->ssl->ssl_error_cb;
- m->c->sslopts->ssl_error_context = options->ssl->ssl_error_context;
- }
- if (m->c->sslopts->struct_version >= 4)
- {
- m->c->sslopts->ssl_psk_cb = options->ssl->ssl_psk_cb;
- m->c->sslopts->ssl_psk_context = options->ssl->ssl_psk_context;
- m->c->sslopts->disableDefaultTrustStore = options->ssl->disableDefaultTrustStore;
- }
- if (m->c->sslopts->struct_version >= 5)
- {
- m->c->sslopts->protos = options->ssl->protos;
- m->c->sslopts->protos_len = options->ssl->protos_len;
- }
- }
- #endif
- if (m->c->username)
- {
- free((void*)m->c->username);
- m->c->username = NULL;
- }
- if (options->username)
- m->c->username = MQTTStrdup(options->username);
- if (m->c->password)
- {
- free((void*)m->c->password);
- m->c->password = NULL;
- }
- if (options->password)
- {
- m->c->password = MQTTStrdup(options->password);
- m->c->passwordlen = (int)strlen(options->password);
- }
- else if (options->struct_version >= 5 && options->binarypwd.data)
- {
- m->c->passwordlen = options->binarypwd.len;
- if ((m->c->password = malloc(m->c->passwordlen)) == NULL)
- {
- rc.reasonCode = PAHO_MEMORY_ERROR;
- goto exit;
- }
- memcpy((void*)m->c->password, options->binarypwd.data, m->c->passwordlen);
- }
- if (options->struct_version >= 3)
- MQTTVersion = options->MQTTVersion;
- else
- MQTTVersion = MQTTVERSION_DEFAULT;
- if (MQTTVersion == MQTTVERSION_DEFAULT)
- {
- rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
- connectProperties, willProperties);
- if (rc.reasonCode != MQTTCLIENT_SUCCESS)
- {
- rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
- connectProperties, willProperties);
- }
- }
- else
- rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
- connectProperties, willProperties);
- exit:
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
- MQTTProperties* connectProperties, MQTTProperties* willProperties);
- int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
- {
- MQTTClients* m = handle;
- MQTTResponse response;
- if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
- return MQTTCLIENT_WRONG_MQTT_VERSION;
- response = MQTTClient_connectAll(handle, options, NULL, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
- MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- MQTTResponse response = MQTTResponse_initializer;
- if (m != NULL && m->c != NULL && m->c->MQTTVersion < MQTTVERSION_5)
- {
- response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
- return response;
- }
- return MQTTClient_connectAll(handle, options, connectProperties, willProperties);
- }
- MQTTResponse MQTTClient_connectAll(MQTTClient handle, MQTTClient_connectOptions* options,
- MQTTProperties* connectProperties, MQTTProperties* willProperties)
- {
- MQTTClients* m = handle;
- MQTTResponse rc = MQTTResponse_initializer;
- FUNC_ENTRY;
- Thread_lock_mutex(connect_mutex);
- Thread_lock_mutex(mqttclient_mutex);
- rc.reasonCode = SOCKET_ERROR;
- if (!library_initialized)
- {
- rc.reasonCode = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (options == NULL || m == NULL || m->c == NULL)
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 8)
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- #if defined(OPENSSL)
- if (m->ssl && options->ssl == NULL)
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- #endif
- if (options->will) /* check validity of will options structure */
- {
- if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- if (options->will->qos < 0 || options->will->qos > 2)
- {
- rc.reasonCode = MQTTCLIENT_BAD_QOS;
- goto exit;
- }
- if (options->will->topicName == NULL)
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- } else if (strlen(options->will->topicName) == 0)
- {
- rc.reasonCode = MQTTCLIENT_0_LEN_WILL_TOPIC;
- goto exit;
- }
- }
- #if defined(OPENSSL)
- if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
- {
- if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 5)
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- }
- #endif
- if ((options->username && !UTF8_validateString(options->username)) ||
- (options->password && !UTF8_validateString(options->password)))
- {
- rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
- (options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
- {
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
- goto exit;
- }
- if (options->MQTTVersion >= MQTTVERSION_5)
- {
- if (options->cleansession != 0)
- {
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
- goto exit;
- }
- }
- else if (options->cleanstart != 0)
- {
- rc.reasonCode = MQTTCLIENT_BAD_MQTT_OPTION;
- goto exit;
- }
- if (options->struct_version < 2 || options->serverURIcount == 0)
- {
- if ( !m )
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
- }
- else
- {
- int i;
- for (i = 0; i < options->serverURIcount; ++i)
- {
- char* serverURI = options->serverURIs[i];
- if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
- serverURI += strlen(URI_TCP);
- else if (strncmp(URI_MQTT, serverURI, strlen(URI_MQTT)) == 0)
- serverURI += strlen(URI_TCP);
- else if (strncmp(URI_WS, serverURI, strlen(URI_WS)) == 0)
- {
- serverURI += strlen(URI_WS);
- m->websocket = 1;
- }
- #if defined(OPENSSL)
- else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
- {
- serverURI += strlen(URI_SSL);
- m->ssl = 1;
- }
- else if (strncmp(URI_MQTTS, serverURI, strlen(URI_MQTTS)) == 0)
- {
- serverURI += strlen(URI_MQTTS);
- m->ssl = 1;
- }
- else if (strncmp(URI_WSS, serverURI, strlen(URI_WSS)) == 0)
- {
- serverURI += strlen(URI_WSS);
- m->ssl = 1;
- m->websocket = 1;
- }
- #endif
- rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
- if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
- break;
- }
- }
- if (rc.reasonCode == MQTTREASONCODE_SUCCESS)
- {
- if (rc.properties && MQTTProperties_hasProperty(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM))
- {
- int recv_max = MQTTProperties_getNumericValue(rc.properties, MQTTPROPERTY_CODE_RECEIVE_MAXIMUM);
- if (m->c->maxInflightMessages > recv_max)
- m->c->maxInflightMessages = recv_max;
- }
- }
- exit:
- if (m && m->c && m->c->will)
- {
- if (m->c->will->payload)
- free(m->c->will->payload);
- if (m->c->will->topic)
- free(m->c->will->topic);
- free(m->c->will);
- m->c->will = NULL;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- Thread_unlock_mutex(connect_mutex);
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- /**
- * mqttclient_mutex must be locked when you call this function, if multi threaded
- */
- static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int call_connection_lost, int stop,
- enum MQTTReasonCodes reason, MQTTProperties* props)
- {
- MQTTClients* m = handle;
- START_TIME_TYPE start;
- int rc = MQTTCLIENT_SUCCESS;
- int was_connected = 0;
- struct conlost_sync_data sync = {
- NULL, m
- };
- FUNC_ENTRY;
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- was_connected = m->c->connected; /* should be 1 */
- if (m->c->connected != 0)
- {
- start = MQTTTime_start_clock();
- m->c->connect_state = DISCONNECTING; /* indicate disconnecting */
- while (m->c->inboundMsgs->count > 0 || m->c->outboundMsgs->count > 0)
- { /* wait for all inflight message flows to finish, up to timeout */
- if (MQTTTime_elapsed(start) >= (ELAPSED_TIME_TYPE)timeout)
- break;
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- }
- }
- MQTTClient_closeSession(m->c, reason, props);
- exit:
- if (stop)
- MQTTClient_stop();
- if (call_connection_lost && m->cl && was_connected)
- {
- sync.sem = Thread_create_sem(&rc);
- Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
- Thread_start(connectionLost_call, &sync);
- Thread_wait_sem(sync.sem, 5000);
- Thread_destroy_sem(sync.sem);
- }
- FUNC_EXIT_RC(rc);
- return rc;
- }
- /**
- * mqttclient_mutex must be locked when you call this function, if multi threaded
- */
- static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout)
- {
- return MQTTClient_disconnect1(handle, timeout, 1, 1, MQTTREASONCODE_SUCCESS, NULL);
- }
- /**
- * mqttclient_mutex must be locked when you call this function, if multi threaded
- */
- void MQTTProtocol_closeSession(Clients* c, int sendwill)
- {
- MQTTClient_disconnect_internal((MQTTClient)c->context, 0);
- }
- int MQTTClient_disconnect(MQTTClient handle, int timeout)
- {
- int rc = 0;
- Thread_lock_mutex(mqttclient_mutex);
- rc = MQTTClient_disconnect1(handle, timeout, 0, 1, MQTTREASONCODE_SUCCESS, NULL);
- Thread_unlock_mutex(mqttclient_mutex);
- return rc;
- }
- int MQTTClient_disconnect5(MQTTClient handle, int timeout, enum MQTTReasonCodes reason, MQTTProperties* props)
- {
- int rc = 0;
- Thread_lock_mutex(mqttclient_mutex);
- rc = MQTTClient_disconnect1(handle, timeout, 0, 1, reason, props);
- Thread_unlock_mutex(mqttclient_mutex);
- return rc;
- }
- int MQTTClient_isConnected(MQTTClient handle)
- {
- MQTTClients* m = handle;
- int rc = 0;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m && m->c)
- rc = m->c->connected;
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- MQTTResponse MQTTClient_subscribeMany5(MQTTClient handle, int count, char* const* topic,
- int* qos, MQTTSubscribe_options* opts, MQTTProperties* props)
- {
- MQTTClients* m = handle;
- List* topics = NULL;
- List* qoss = NULL;
- int i = 0;
- int rc = MQTTCLIENT_FAILURE;
- MQTTResponse resp = MQTTResponse_initializer;
- int msgid = 0;
- FUNC_ENTRY;
- Thread_lock_mutex(subscribe_mutex);
- Thread_lock_mutex(mqttclient_mutex);
- resp.reasonCode = MQTTCLIENT_FAILURE;
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- for (i = 0; i < count; i++)
- {
- if (!UTF8_validateString(topic[i]))
- {
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- if (qos[i] < 0 || qos[i] > 2)
- {
- rc = MQTTCLIENT_BAD_QOS;
- goto exit;
- }
- }
- if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
- {
- rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
- goto exit;
- }
- topics = ListInitialize();
- qoss = ListInitialize();
- for (i = 0; i < count; i++)
- {
- ListAppend(topics, topic[i], strlen(topic[i]));
- ListAppend(qoss, &qos[i], sizeof(int));
- }
- rc = MQTTProtocol_subscribe(m->c, topics, qoss, msgid, opts, props);
- ListFreeNoContent(topics);
- ListFreeNoContent(qoss);
- if (rc == TCPSOCKET_COMPLETE)
- {
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_waitfor(handle, SUBACK, &rc, m->commandTimeout);
- Thread_lock_mutex(mqttclient_mutex);
- if (pack != NULL)
- {
- Suback* sub = (Suback*)pack;
- if (m->c->MQTTVersion == MQTTVERSION_5)
- {
- if (sub->properties.count > 0)
- {
- if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- *resp.properties = MQTTProperties_copy(&sub->properties);
- }
- resp.reasonCodeCount = sub->qoss->count;
- resp.reasonCode = *(int*)sub->qoss->first->content;
- if (sub->qoss->count > 1)
- {
- ListElement* current = NULL;
- int rc_count = 0;
- if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (sub->qoss->count))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- while (ListNextElement(sub->qoss, ¤t))
- (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
- }
- }
- else
- {
- ListElement *current = NULL;
- /* if the returned count is greater than requested, it's an error*/
- if (sub->qoss->count > count)
- rc = MQTTCLIENT_FAILURE;
- else
- {
- i = 0;
- while (ListNextElement(sub->qoss, ¤t))
- {
- int *reqqos = (int*) (current->content);
- qos[i++] = *reqqos;
- }
- }
- resp.reasonCode = rc;
- }
- rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
- m->pack = NULL;
- }
- else
- rc = SOCKET_ERROR;
- }
- if (rc == SOCKET_ERROR)
- MQTTClient_disconnect_internal(handle, 0);
- else if (rc == TCPSOCKET_COMPLETE)
- rc = MQTTCLIENT_SUCCESS;
- exit:
- if (rc < 0)
- resp.reasonCode = rc;
- Thread_unlock_mutex(mqttclient_mutex);
- Thread_unlock_mutex(subscribe_mutex);
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- int MQTTClient_subscribeMany(MQTTClient handle, int count, char* const* topic, int* qos)
- {
- MQTTClients* m = handle;
- MQTTResponse response = MQTTResponse_initializer;
- if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
- response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
- else
- response = MQTTClient_subscribeMany5(handle, count, topic, qos, NULL, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_subscribe5(MQTTClient handle, const char* topic, int qos,
- MQTTSubscribe_options* opts, MQTTProperties* props)
- {
- MQTTResponse rc;
- FUNC_ENTRY;
- rc = MQTTClient_subscribeMany5(handle, 1, (char * const *)(&topic), &qos, opts, props);
- if (qos == MQTT_BAD_SUBSCRIBE) /* addition for MQTT 3.1.1 - error code from subscribe */
- rc.reasonCode = MQTT_BAD_SUBSCRIBE;
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- int MQTTClient_subscribe(MQTTClient handle, const char* topic, int qos)
- {
- MQTTClients* m = handle;
- MQTTResponse response = MQTTResponse_initializer;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
- else
- response = MQTTClient_subscribe5(handle, topic, qos, NULL, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_unsubscribeMany5(MQTTClient handle, int count, char* const* topic, MQTTProperties* props)
- {
- MQTTClients* m = handle;
- List* topics = NULL;
- int i = 0;
- int rc = SOCKET_ERROR;
- MQTTResponse resp = MQTTResponse_initializer;
- int msgid = 0;
- FUNC_ENTRY;
- Thread_lock_mutex(unsubscribe_mutex);
- Thread_lock_mutex(mqttclient_mutex);
- resp.reasonCode = MQTTCLIENT_FAILURE;
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- for (i = 0; i < count; i++)
- {
- if (!UTF8_validateString(topic[i]))
- {
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- goto exit;
- }
- }
- if ((msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
- {
- rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
- goto exit;
- }
- topics = ListInitialize();
- for (i = 0; i < count; i++)
- ListAppend(topics, topic[i], strlen(topic[i]));
- rc = MQTTProtocol_unsubscribe(m->c, topics, msgid, props);
- ListFreeNoContent(topics);
- if (rc == TCPSOCKET_COMPLETE)
- {
- MQTTPacket* pack = NULL;
- Thread_unlock_mutex(mqttclient_mutex);
- pack = MQTTClient_waitfor(handle, UNSUBACK, &rc, m->commandTimeout);
- Thread_lock_mutex(mqttclient_mutex);
- if (pack != NULL)
- {
- Unsuback* unsub = (Unsuback*)pack;
- if (m->c->MQTTVersion == MQTTVERSION_5)
- {
- if (unsub->properties.count > 0)
- {
- if ((resp.properties = malloc(sizeof(MQTTProperties))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- *resp.properties = MQTTProperties_copy(&unsub->properties);
- }
- resp.reasonCodeCount = unsub->reasonCodes->count;
- resp.reasonCode = *(int*)unsub->reasonCodes->first->content;
- if (unsub->reasonCodes->count > 1)
- {
- ListElement* current = NULL;
- int rc_count = 0;
- if ((resp.reasonCodes = malloc(sizeof(enum MQTTReasonCodes) * (unsub->reasonCodes->count))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- while (ListNextElement(unsub->reasonCodes, ¤t))
- (resp.reasonCodes)[rc_count++] = *(enum MQTTReasonCodes*)(current->content);
- }
- }
- else
- resp.reasonCode = rc;
- rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
- m->pack = NULL;
- }
- else
- rc = SOCKET_ERROR;
- }
- if (rc == SOCKET_ERROR)
- MQTTClient_disconnect_internal(handle, 0);
- exit:
- if (rc < 0)
- resp.reasonCode = rc;
- Thread_unlock_mutex(mqttclient_mutex);
- Thread_unlock_mutex(unsubscribe_mutex);
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- int MQTTClient_unsubscribeMany(MQTTClient handle, int count, char* const* topic)
- {
- MQTTClients* m = handle;
- MQTTResponse response = MQTTResponse_initializer;
- if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
- response.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
- else
- response = MQTTClient_unsubscribeMany5(handle, count, topic, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_unsubscribe5(MQTTClient handle, const char* topic, MQTTProperties* props)
- {
- MQTTResponse rc;
- rc = MQTTClient_unsubscribeMany5(handle, 1, (char * const *)(&topic), props);
- return rc;
- }
- int MQTTClient_unsubscribe(MQTTClient handle, const char* topic)
- {
- MQTTResponse response = MQTTClient_unsubscribe5(handle, topic, NULL);
- return response.reasonCode;
- }
- MQTTResponse MQTTClient_publish5(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
- int qos, int retained, MQTTProperties* properties, MQTTClient_deliveryToken* deliveryToken)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- Messages* msg = NULL;
- Publish* p = NULL;
- int blocked = 0;
- int msgid = 0;
- MQTTResponse resp = MQTTResponse_initializer;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c == NULL)
- rc = MQTTCLIENT_FAILURE;
- else if (m->c->connected == 0)
- rc = MQTTCLIENT_DISCONNECTED;
- else if (!UTF8_validateString(topicName))
- rc = MQTTCLIENT_BAD_UTF8_STRING;
- if (rc != MQTTCLIENT_SUCCESS)
- goto exit;
- /* If outbound queue is full, block until it is not */
- while (m->c->outboundMsgs->count >= m->c->maxInflightMessages ||
- Socket_noPendingWrites(m->c->net.socket) == 0) /* wait until the socket is free of large packets being written */
- {
- if (blocked == 0)
- {
- blocked = 1;
- Log(TRACE_MIN, -1, "Blocking publish on queue full for client %s", m->c->clientID);
- }
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- }
- if (blocked == 1)
- Log(TRACE_MIN, -1, "Resuming publish now queue not full for client %s", m->c->clientID);
- if (qos > 0 && (msgid = MQTTProtocol_assignMsgId(m->c)) == 0)
- { /* this should never happen as we've waited for spaces in the queue */
- rc = MQTTCLIENT_MAX_MESSAGES_INFLIGHT;
- goto exit;
- }
- if ((p = malloc(sizeof(Publish))) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit_and_free;
- }
- memset(p->mask, '\0', sizeof(p->mask));
- p->payload = NULL;
- p->payloadlen = payloadlen;
- if (payloadlen > 0)
- {
- if ((p->payload = malloc(payloadlen)) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit_and_free;
- }
- memcpy(p->payload, payload, payloadlen);
- }
- if ((p->topic = MQTTStrdup(topicName)) == NULL)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit_and_free;
- }
- p->msgId = msgid;
- p->MQTTVersion = m->c->MQTTVersion;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- {
- if (properties)
- p->properties = *properties;
- else
- {
- MQTTProperties props = MQTTProperties_initializer;
- p->properties = props;
- }
- }
- rc = MQTTProtocol_startPublish(m->c, p, qos, retained, &msg);
- /* If the packet was partially written to the socket, wait for it to complete.
- * However, if the client is disconnected during this time and qos is not 0, still return success, as
- * the packet has already been written to persistence and assigned a message id so will
- * be sent when the client next connects.
- */
- if (rc == TCPSOCKET_INTERRUPTED)
- {
- while (m->c->connected == 1)
- {
- pending_writes* writing = NULL;
- Thread_lock_mutex(socket_mutex);
- writing = SocketBuffer_getWrite(m->c->net.socket);
- Thread_unlock_mutex(socket_mutex);
- if (writing == NULL)
- break;
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- }
- rc = (qos > 0 || m->c->connected == 1) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
- }
- if (deliveryToken && qos > 0)
- *deliveryToken = msg->msgid;
- exit_and_free:
- if (p)
- {
- if (p->topic)
- free(p->topic);
- if (p->payload)
- free(p->payload);
- free(p);
- }
- if (rc == SOCKET_ERROR)
- {
- MQTTClient_disconnect_internal(handle, 0);
- /* Return success for qos > 0 as the send will be retried automatically */
- rc = (qos > 0) ? MQTTCLIENT_SUCCESS : MQTTCLIENT_FAILURE;
- }
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- resp.reasonCode = rc;
- FUNC_EXIT_RC(resp.reasonCode);
- return resp;
- }
- int MQTTClient_publish(MQTTClient handle, const char* topicName, int payloadlen, const void* payload,
- int qos, int retained, MQTTClient_deliveryToken* deliveryToken)
- {
- MQTTClients* m = handle;
- MQTTResponse rc = MQTTResponse_initializer;
- if (m->c->MQTTVersion >= MQTTVERSION_5)
- rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
- else
- rc = MQTTClient_publish5(handle, topicName, payloadlen, payload, qos, retained, NULL, deliveryToken);
- return rc.reasonCode;
- }
- MQTTResponse MQTTClient_publishMessage5(MQTTClient handle, const char* topicName, MQTTClient_message* message,
- MQTTClient_deliveryToken* deliveryToken)
- {
- MQTTResponse rc = MQTTResponse_initializer;
- MQTTProperties* props = NULL;
- FUNC_ENTRY;
- if (message == NULL)
- {
- rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
- goto exit;
- }
- if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
- (message->struct_version != 0 && message->struct_version != 1))
- {
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- goto exit;
- }
- if (message->struct_version >= 1)
- props = &message->properties;
- rc = MQTTClient_publish5(handle, topicName, message->payloadlen, message->payload,
- message->qos, message->retained, props, deliveryToken);
- exit:
- FUNC_EXIT_RC(rc.reasonCode);
- return rc;
- }
- int MQTTClient_publishMessage(MQTTClient handle, const char* topicName, MQTTClient_message* message,
- MQTTClient_deliveryToken* deliveryToken)
- {
- MQTTClients* m = handle;
- MQTTResponse rc = MQTTResponse_initializer;
- if (strncmp(message->struct_id, "MQTM", 4) != 0 ||
- (message->struct_version != 0 && message->struct_version != 1))
- rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
- else if (m != NULL && m->c != NULL && m->c->MQTTVersion >= MQTTVERSION_5)
- rc.reasonCode = MQTTCLIENT_WRONG_MQTT_VERSION;
- else
- rc = MQTTClient_publishMessage5(handle, topicName, message, deliveryToken);
- return rc.reasonCode;
- }
- static void MQTTClient_retry(void)
- {
- static START_TIME_TYPE last = START_TIME_ZERO;
- START_TIME_TYPE now;
- FUNC_ENTRY;
- now = MQTTTime_now();
- if (MQTTTime_difftime(now, last) >= (DIFF_TIME_TYPE)(retryLoopIntervalms))
- {
- last = MQTTTime_now();
- MQTTProtocol_keepalive(now);
- MQTTProtocol_retry(now, 1, 0);
- }
- else
- MQTTProtocol_retry(now, 0, 0);
- FUNC_EXIT;
- }
- static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int* rc)
- {
- static Ack ack;
- MQTTPacket* pack = NULL;
- int rc1 = 0;
- START_TIME_TYPE start;
- FUNC_ENTRY;
- #if defined(OPENSSL)
- if ((*sock = SSLSocket_getPendingRead()) == -1)
- {
- /* 0 from getReadySocket indicates no work to do, rc -1 == error */
- #endif
- start = MQTTTime_start_clock();
- *sock = Socket_getReadySocket(0, (int)timeout, socket_mutex, rc);
- *rc = rc1;
- if (*sock == 0 && timeout >= 100L && MQTTTime_elapsed(start) < (int64_t)10)
- MQTTTime_sleep(100L);
- #if defined(OPENSSL)
- }
- #endif
- Thread_lock_mutex(mqttclient_mutex);
- if (*sock > 0 && rc1 == 0)
- {
- MQTTClients* m = NULL;
- if (ListFindItem(handles, sock, clientSockCompare) != NULL)
- m = (MQTTClient)(handles->current->content);
- if (m != NULL)
- {
- if (m->c->connect_state == TCP_IN_PROGRESS || m->c->connect_state == SSL_IN_PROGRESS)
- *rc = 0; /* waiting for connect state to clear */
- else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS)
- *rc = WebSocket_upgrade(&m->c->net);
- else
- {
- pack = MQTTPacket_Factory(m->c->MQTTVersion, &m->c->net, rc);
- if (*rc == TCPSOCKET_INTERRUPTED)
- *rc = 0;
- }
- }
- if (pack)
- {
- int freed = 1;
- /* Note that these handle... functions free the packet structure that they are dealing with */
- if (pack->header.bits.type == PUBLISH)
- *rc = MQTTProtocol_handlePublishes(pack, *sock);
- else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
- {
- int msgid;
- ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
- msgid = ack.msgId;
- if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published)
- {
- Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, msgid);
- (*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
- }
- *rc = (pack->header.bits.type == PUBCOMP) ?
- MQTTProtocol_handlePubcomps(pack, *sock, NULL) : MQTTProtocol_handlePubacks(pack, *sock, NULL);
- if (m && m->dc)
- {
- Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
- (*(m->dc))(m->context, msgid);
- }
- }
- else if (pack->header.bits.type == PUBREC)
- {
- Pubrec* pubrec = (Pubrec*)pack;
- if (m && m->c->MQTTVersion >= MQTTVERSION_5 && m->published && pubrec->rc >= MQTTREASONCODE_UNSPECIFIED_ERROR)
- {
- Log(TRACE_MIN, -1, "Calling published for client %s, msgid %d", m->c->clientID, ack.msgId);
- (*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
- &pubrec->properties, pubrec->rc);
- }
- *rc = MQTTProtocol_handlePubrecs(pack, *sock, NULL);
- }
- else if (pack->header.bits.type == PUBREL)
- *rc = MQTTProtocol_handlePubrels(pack, *sock);
- else if (pack->header.bits.type == PINGRESP)
- *rc = MQTTProtocol_handlePingresps(pack, *sock);
- else
- freed = 0;
- if (freed)
- pack = NULL;
- }
- }
- MQTTClient_retry();
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(*rc);
- return pack;
- }
- static MQTTPacket* MQTTClient_waitfor(MQTTClient handle, int packet_type, int* rc, int64_t timeout)
- {
- MQTTPacket* pack = NULL;
- MQTTClients* m = handle;
- START_TIME_TYPE start = MQTTTime_start_clock();
- int is_running = 0; /* local copy of running */
- FUNC_ENTRY;
- if (((MQTTClients*)handle) == NULL || timeout <= 0L)
- {
- *rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- Thread_lock_mutex(mqttclient_mutex);
- is_running = running;
- Thread_unlock_mutex(mqttclient_mutex);
- if (is_running)
- {
- if (packet_type == CONNECT)
- {
- if ((*rc = Thread_wait_sem(m->connect_sem, (int)timeout)) == 0)
- *rc = m->rc;
- }
- else if (packet_type == CONNACK)
- *rc = Thread_wait_sem(m->connack_sem, (int)timeout);
- else if (packet_type == SUBACK)
- *rc = Thread_wait_sem(m->suback_sem, (int)timeout);
- else if (packet_type == UNSUBACK)
- *rc = Thread_wait_sem(m->unsuback_sem, (int)timeout);
- if (*rc == 0 && packet_type != CONNECT && m->pack == NULL)
- Log(LOG_ERROR, -1, "waitfor unexpectedly is NULL for client %s, packet_type %d, timeout %ld", m->c->clientID, packet_type, timeout);
- pack = m->pack;
- }
- else
- {
- *rc = TCPSOCKET_COMPLETE;
- while (1)
- {
- SOCKET sock = -1;
- pack = MQTTClient_cycle(&sock, 100L, rc);
- if (sock == m->c->net.socket)
- {
- if (*rc == SOCKET_ERROR)
- break;
- if (pack && (pack->header.bits.type == packet_type))
- break;
- if (m->c->connect_state == TCP_IN_PROGRESS)
- {
- int error;
- socklen_t len = sizeof(error);
- if ((*rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
- *rc = error;
- break;
- }
- #if defined(OPENSSL)
- else if (m->c->connect_state == SSL_IN_PROGRESS)
- {
- *rc = m->c->sslopts->struct_version >= 3 ?
- SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
- m->c->sslopts->verify, m->c->sslopts->ssl_error_cb, m->c->sslopts->ssl_error_context) :
- SSLSocket_connect(m->c->net.ssl, sock, m->currentServerURI,
- m->c->sslopts->verify, NULL, NULL);
- if (*rc == SSL_FATAL)
- break;
- else if (*rc == 1) /* rc == 1 means SSL connect has finished and succeeded */
- {
- if ((m->c->cleansession == 0 && m->c->cleanstart == 0) && m->c->session == NULL)
- m->c->session = SSL_get1_session(m->c->net.ssl);
- break;
- }
- }
- #endif
- else if (m->c->connect_state == WEBSOCKET_IN_PROGRESS && *rc != TCPSOCKET_INTERRUPTED)
- {
- *rc = 1;
- break;
- }
- else if (m->c->connect_state == PROXY_CONNECT_IN_PROGRESS )
- {
- *rc = 1;
- break;
- }
- else if (m->c->connect_state == WAIT_FOR_CONNACK)
- {
- int error;
- socklen_t len = sizeof(error);
- if (getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0)
- {
- if (error)
- {
- *rc = error;
- break;
- }
- }
- }
- }
- if (MQTTTime_elapsed(start) > (uint64_t)timeout)
- {
- pack = NULL;
- break;
- }
- }
- }
- exit:
- FUNC_EXIT_RC(*rc);
- return pack;
- }
- int MQTTClient_receive(MQTTClient handle, char** topicName, int* topicLen, MQTTClient_message** message,
- unsigned long timeout)
- {
- int rc = TCPSOCKET_COMPLETE;
- START_TIME_TYPE start = MQTTTime_start_clock();
- ELAPSED_TIME_TYPE elapsed = 0L;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- if (m == NULL || m->c == NULL
- || running) /* receive is not meant to be called in a multi-thread environment */
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- *topicName = NULL;
- *message = NULL;
- /* if there is already a message waiting, don't hang around but still do some packet handling */
- if (m->c->messageQueue->count > 0)
- timeout = 0L;
- elapsed = MQTTTime_elapsed(start);
- do
- {
- SOCKET sock = 0;
- MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
- if (rc == SOCKET_ERROR)
- {
- if (ListFindItem(handles, &sock, clientSockCompare) && /* find client corresponding to socket */
- (MQTTClient)(handles->current->content) == handle)
- break; /* there was an error on the socket we are interested in */
- }
- elapsed = MQTTTime_elapsed(start);
- }
- while (elapsed < timeout && m->c->messageQueue->count == 0);
- if (m->c->messageQueue->count > 0)
- rc = MQTTClient_deliverMessage(rc, m, topicName, topicLen, message);
- if (rc == SOCKET_ERROR)
- MQTTClient_disconnect_internal(handle, 0);
- exit:
- FUNC_EXIT_RC(rc);
- return rc;
- }
- void MQTTClient_yield(void)
- {
- START_TIME_TYPE start = MQTTTime_start_clock();
- ELAPSED_TIME_TYPE elapsed = 0L;
- ELAPSED_TIME_TYPE timeout = 100L;
- int rc = 0;
- FUNC_ENTRY;
- if (running) /* yield is not meant to be called in a multi-thread environment */
- {
- MQTTTime_sleep(timeout);
- goto exit;
- }
- elapsed = MQTTTime_elapsed(start);
- do
- {
- SOCKET sock = -1;
- MQTTClient_cycle(&sock, (timeout > elapsed) ? timeout - elapsed : 0L, &rc);
- Thread_lock_mutex(mqttclient_mutex);
- if (rc == SOCKET_ERROR && ListFindItem(handles, &sock, clientSockCompare))
- {
- MQTTClients* m = (MQTTClient)(handles->current->content);
- if (m->c->connect_state != DISCONNECTING)
- MQTTClient_disconnect_internal(m, 0);
- }
- Thread_unlock_mutex(mqttclient_mutex);
- elapsed = MQTTTime_elapsed(start);
- }
- while (elapsed < timeout);
- exit:
- FUNC_EXIT;
- }
- /*
- static int pubCompare(void* a, void* b)
- {
- Messages* msg = (Messages*)a;
- return msg->publish == (Publications*)b;
- }*/
- int MQTTClient_waitForCompletion(MQTTClient handle, MQTTClient_deliveryToken mdt, unsigned long timeout)
- {
- int rc = MQTTCLIENT_FAILURE;
- START_TIME_TYPE start = MQTTTime_start_clock();
- ELAPSED_TIME_TYPE elapsed = 0L;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL || m->c == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- elapsed = MQTTTime_elapsed(start);
- while (elapsed < timeout)
- {
- if (m->c->connected == 0)
- {
- rc = MQTTCLIENT_DISCONNECTED;
- goto exit;
- }
- if (ListFindItem(m->c->outboundMsgs, &mdt, messageIDCompare) == NULL)
- {
- rc = MQTTCLIENT_SUCCESS; /* well we couldn't find it */
- goto exit;
- }
- Thread_unlock_mutex(mqttclient_mutex);
- MQTTClient_yield();
- Thread_lock_mutex(mqttclient_mutex);
- elapsed = MQTTTime_elapsed(start);
- }
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- int MQTTClient_getPendingDeliveryTokens(MQTTClient handle, MQTTClient_deliveryToken **tokens)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- *tokens = NULL;
- FUNC_ENTRY;
- Thread_lock_mutex(mqttclient_mutex);
- if (m == NULL)
- {
- rc = MQTTCLIENT_FAILURE;
- goto exit;
- }
- if (m->c && m->c->outboundMsgs->count > 0)
- {
- ListElement* current = NULL;
- int count = 0;
- *tokens = malloc(sizeof(MQTTClient_deliveryToken) * (m->c->outboundMsgs->count + 1));
- if (!*tokens)
- {
- rc = PAHO_MEMORY_ERROR;
- goto exit;
- }
- while (ListNextElement(m->c->outboundMsgs, ¤t))
- {
- Messages* m = (Messages*)(current->content);
- (*tokens)[count++] = m->msgid;
- }
- (*tokens)[count] = -1;
- }
- exit:
- Thread_unlock_mutex(mqttclient_mutex);
- FUNC_EXIT_RC(rc);
- return rc;
- }
- void MQTTClient_setTraceLevel(enum MQTTCLIENT_TRACE_LEVELS level)
- {
- Log_setTraceLevel((enum LOG_LEVELS)level);
- }
- void MQTTClient_setTraceCallback(MQTTClient_traceCallback* callback)
- {
- Log_setTraceCallback((Log_traceCallback*)callback);
- }
- int MQTTClient_setCommandTimeout(MQTTClient handle, unsigned long milliSeconds)
- {
- int rc = MQTTCLIENT_SUCCESS;
- MQTTClients* m = handle;
- FUNC_ENTRY;
- if (milliSeconds < 5000L)
- rc = MQTTCLIENT_FAILURE;
- else
- m->commandTimeout = milliSeconds;
- FUNC_EXIT_RC(rc);
- return rc;
- }
- MQTTClient_nameValue* MQTTClient_getVersionInfo(void)
- {
- #define MAX_INFO_STRINGS 8
- static MQTTClient_nameValue libinfo[MAX_INFO_STRINGS + 1];
- int i = 0;
- libinfo[i].name = "Product name";
- libinfo[i++].value = "Eclipse Paho Synchronous MQTT C Client Library";
- libinfo[i].name = "Version";
- libinfo[i++].value = CLIENT_VERSION;
- libinfo[i].name = "Build level";
- libinfo[i++].value = BUILD_TIMESTAMP;
- #if defined(OPENSSL)
- libinfo[i].name = "OpenSSL version";
- libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
- libinfo[i].name = "OpenSSL flags";
- libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
- libinfo[i].name = "OpenSSL build timestamp";
- libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
- libinfo[i].name = "OpenSSL platform";
- libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
- libinfo[i].name = "OpenSSL directory";
- libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
- #endif
- libinfo[i].name = NULL;
- libinfo[i].value = NULL;
- return libinfo;
- }
- const char* MQTTClient_strerror(int code)
- {
- static char buf[30];
- int chars = 0;
- switch (code) {
- case MQTTCLIENT_SUCCESS:
- return "Success";
- case MQTTCLIENT_FAILURE:
- return "Failure";
- case MQTTCLIENT_DISCONNECTED:
- return "Disconnected";
- case MQTTCLIENT_MAX_MESSAGES_INFLIGHT:
- return "Maximum in-flight messages amount reached";
- case MQTTCLIENT_BAD_UTF8_STRING:
- return "Invalid UTF8 string";
- case MQTTCLIENT_NULL_PARAMETER:
- return "Invalid (NULL) parameter";
- case MQTTCLIENT_TOPICNAME_TRUNCATED:
- return "Topic containing NULL characters has been truncated";
- case MQTTCLIENT_BAD_STRUCTURE:
- return "Bad structure";
- case MQTTCLIENT_BAD_QOS:
- return "Invalid QoS value";
- case MQTTCLIENT_SSL_NOT_SUPPORTED:
- return "SSL is not supported";
- case MQTTCLIENT_BAD_MQTT_VERSION:
- return "Unrecognized MQTT version";
- case MQTTCLIENT_BAD_PROTOCOL:
- return "Invalid protocol scheme";
- case MQTTCLIENT_BAD_MQTT_OPTION:
- return "Options for wrong MQTT version";
- case MQTTCLIENT_WRONG_MQTT_VERSION:
- return "Client created for another version of MQTT";
- case MQTTCLIENT_0_LEN_WILL_TOPIC:
- return "Zero length will topic on connect";
- }
- chars = snprintf(buf, sizeof(buf), "Unknown error code %d", code);
- if (chars >= sizeof(buf))
- {
- buf[sizeof(buf)-1] = '\0';
- Log(LOG_ERROR, 0, "Error writing %d chars with snprintf", chars);
- }
- return buf;
- }
- /**
- * See if any pending writes have been completed, and cleanup if so.
- * Cleaning up means removing any publication data that was stored because the write did
- * not originally complete.
- */
- static void MQTTProtocol_checkPendingWrites(void)
- {
- FUNC_ENTRY;
- if (state.pending_writes.count > 0)
- {
- ListElement* le = state.pending_writes.first;
- while (le)
- {
- if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
- {
- MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
- state.pending_writes.current = le;
- ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
- le = state.pending_writes.current;
- }
- else
- ListNextElement(&(state.pending_writes), &le);
- }
- }
- FUNC_EXIT;
- }
- static void MQTTClient_writeComplete(SOCKET socket, int rc)
- {
- ListElement* found = NULL;
- FUNC_ENTRY;
- /* a partial write is now complete for a socket - this will be on a publish*/
- MQTTProtocol_checkPendingWrites();
- /* find the client using this socket */
- if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
- {
- MQTTClients* m = (MQTTClients*)(found->content);
- m->c->net.lastSent = MQTTTime_now();
- }
- FUNC_EXIT;
- }
- static void MQTTClient_writeContinue(SOCKET socket)
- {
- ListElement* found = NULL;
- if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
- {
- MQTTClients* m = (MQTTClients*)(found->content);
- m->c->net.lastSent = MQTTTime_now();
- }
- }
|